Site icon Data Pipeline

APIs

Setting up API Keys

API keys are not required to use the API, but you can setup the Pipeline to enforce the use of API keys. First, in the Helm or project resource application configuration. Set the useApiKeys value to “true” as shown below in bold.

...
schedule:
  checkFileNotifierQueue: "5000"
  findJobsToStart: "5000"
environment: idata-poc
useApiKeys: "true"
aws:
...

Next you will need to configure the API keys in AWS Secrets Manager. You can setup one or many API keys for use by the Pipeline API as key/value pairs. When a Pipeline API request is made, the key/values are read from the Secrets Manager secret and validated. The key names are ignored and only the values are used to validate.

Save the secret and set the apiKeysSecretName value to the secret name saved as shown below in bold.

aws:
  region: us-east-1
  secretsManager:
    apiKeysSecretName: my-api-keys-secret-name

Version API

1. Retrieve the Pipeline Version

GET http://hostname/version

HeaderValue
x-api-keyapi-key

Dataset Configuration API

This API enables the registration, retrieval and deletion of dataset JSON configurations.

1. Registering a Dataset Configuration

POST http://hostname/dataset

HeaderValue
x-api-keyapi-key
Content-Typeapplication/json

Body

{
    [dataset_configuration_here]
}

2. Retrieving a Dataset Configuration

GET http://hostname/dataset?dataset=dataset_name_here

HeaderValue
x-api-keyapi-key
Content-Typeapplication/json
Parameters
ParameterValue
datasetThe name of the dataset to retrieve

3. Delete a Dataset Configuration

DELETE http://hostname/dataset?dataset_name_here

HeaderValue
x-api-keyapi-key
Content-Typeapplication/json
Parameters
HeaderValue
datasetThe name of the dataset to delete

4. Retrieve all Registered Dataset Configurations

GET http://hostname/datasets

HeaderValue
x-api-keyapi-key

Notification Subscription API

When a dataset arrives at its final destination, whether it is S3, Snowflake or Redshift, a notification is sent to the Dataset Notification SNS Topic. If you would like to attach an SQS queue or other type of resource to these notifications, this can be done using the Notification Subscription API.

1. Create a Subscription to a Dataset Notification

POST http://hostname/subscription

HeaderValue
x-api-keyapi-key
Content-Typeapplication/json

Body:

{
    "endpointArn": "arn:aws:sqs:us-east-1:196014872813:idata-dev-stock-price",
    "protocol": "sqs",
    "filterPolicy": "{\"prefixKey\": [\"yahoo/finance\"]}"
}
FieldRequiredDescription
endpointArnyesThe ARN of the destination of the notification message
protocolyesThe protocol for the ARN above. Valid values include sqs, http, https, email, email-json, sms, application, lambda or firehose
filterPolicynoIf no filterPolicy is included, the endpointArn will receive notifications for all datasets delivered to their destination(s). Using a filterPolicy, you can filter what notifications you would like to receive at the endpoint. If included, this field must contain an escaped JSON string. Examples are below.

filterPolicy Examples

Response:

{
    "owner": "196014872813",
    "subscriptionArn": "arn:aws:sns:us-east-1:196014872813:idata-dev-dataset-notification:cb33899e-c115-4dba-8529-3a76b80aaa93",
    "topicArn": "arn:aws:sns:us-east-1:196014872813:idata-dev-dataset-notification",
    "endpointArn": "arn:aws:sqs:us-east-1:196014872813:idata-dev-rimes",
    "protocol": "sqs",
    "filterPolicy": "{\"prefixKey\": [\"yahoo/finance\"]}"
}

A note on permissions to your endpoint. Make sure that the endpoint provides permission to the Pipeline SNS Notification Queue to write to your endpoint. For example, if your endpoint is SQS, the SNS topic must have permission to write to that SQS endpoint, as in below.

SQS Access Policy

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "MySQSPolicy001",
      "Effect": "Allow",
      "Principal": "*",
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:us-east-1:196014872813:idata-dev-rimes",
      "Condition": {
        "ArnEquals": {
          "aws:SourceArn": "arn:aws:sns:us-east-1:196014872813:idata-dev-dataset-notification"
        }
      }
    }
  ]
}

2. Get a subscription

GET http://hostname/subscription?subscriptionarn=[subscriptionArn]

HeaderValue
x-api-keyapi-key
ParameterDescription
subscriptionThe ARN for the subscription

Response:

{
    "owner": "196014872813",
    "subscriptionArn": "arn:aws:sns:us-east-1:196014872813:idata-dev-dataset-notification:cb33899e-c115-4dba-8529-3a76b80aaa93",
    "topicArn": "arn:aws:sns:us-east-1:196014872813:idata-dev-dataset-notification",
    "endpointArn": "arn:aws:sqs:us-east-1:196014872813:idata-dev-rimes",
    "protocol": "sqs",
    "filterPolicy": "{\n  \"prefixKey\": [\"yahoo/finance\"]\n}"
}

3. Get all subscriptions

GET http://hostname/subscriptions

HeaderValue
x-api-keyapi-key

Response:

[
    {
        "owner": "196014872813",
        "subscriptionArn": "arn:aws:sns:us-east-1:196014872813:idata-dev-dataset-notification:cb33899e-c115-4dba-8529-3a76b80aaa93",
        "topicArn": "arn:aws:sns:us-east-1:196014872813:idata-dev-dataset-notification",
        "endpointArn": "arn:aws:sqs:us-east-1:196014872813:idata-dev-rimes",
        "protocol": "sqs",
        "filterPolicy": "{\n  \"prefixKey\": [\"yahoo/finance\"]\n}"
    }
]

4. Delete a subscription

DELETE http://hostname/subscription?subscriptionarn=[subscriptionArn]

HeaderValue
x-api-keyapi-key
ParameterDescription
subscriptionThe ARN for the subscription

Data Ingestion API

To ingest raw data into the Pipeline, you can make use of the Data Ingestion API.

1. Ingest a Data File into the Pipeline

This endpoint employs a multi-part file upload.

POST http://hostname/dataset/upload

HeaderValue
x-api-keyapi-key
Content-Typemultipart/form-data
ParameterRequiredDescription
datasetyesThe dataset associated with the data file being uploaded
publishertokennoOptionally, you can pass a UUID as this value. If none is passed, the Pipeline will automatically generate one.

Here’s a generic curl example:

curl --location --request POST 'https://hostname/dataset/upload?dataset=stock_price' \
--header 'x-api-key: d0322db4-f8ac-11ec-b939-0242ac120002' \
--form 'file=@"stock_price.20190303.dataset.csv"'

Dataset Generation API

You can use this API to infer a dataset and its schema from a raw data file. Note: It’s best to use a file with a large amount of data to get the inference correct.

1. Infer a Dataset with Schema using the REST multi-part upload endpoint

POST http://hostname/dataset/generate

HeaderValue
x-api-keyapi-key
Content-Typemultipart/form-data
ParameterRequiredDescription
fileyesmultipart file
datasetyesThe name of the dataset configuration to generate
delimiternoThe delimiter of the file to be inferred (e.g. ,)
headernoIf this value is true, the file has a header

Here’s an example using curl to generate a dataset with a schema from a data file.

curl --location --request POST 'http://localhost:8080/dataset/generate?dataset=stock_price' \
--header 'x-api-key: 1847626a-5b46-4d43-827c-25f323d9201b' \
--form 'file=@"./pipeline-server/test/files/stock_price.20170104.dataset.csv"'

Dataset Status API

You can use this API to build your own dataset monitoring user-interface.

1. Retrieve the dataset summary page

This method will retrieve a list of dataset status records sorted by time, most recent first.

GET http://hostname/dataset/status

HeaderValue
x-api-keyapi-key
ParameterRequiredDescription
pagenoIf the page number is not present, page 1 is defaulted

Here’s a curl example to make the call:

echo GET /dataset/status
curl --location --request GET 'http://localhost:8080/dataset/status' \
--header 'x-api-key: 1847626a-5b46-4d43-827c-25f323d9201b' | json_pp

Sample response:

[
    {
        "createdAtTimestamp": "2023-02-06 14:05:35.886",
        "createdAt": 1675710335886,
        "updatedAt": 1675710343926,
        "dataset": "stock_price",
        "pipelineToken": "92adbf1a-94de-5188-8e1d-607233b7a9d2",
        "process": "SnowflakeLoader",
        "startTime": "02-06-2023 14:05:35 EST",
        "endTime": "02-06-2023 14:05:43 EST",
        "totalTime": "8 sec",
        "status": "success"
    },
    {
        "createdAtTimestamp": "2023-02-03 15:35:29.666",
        "createdAt": 1675456529666,
        "updatedAt": 1675456545083,
        "dataset": "cusips",
        "pipelineToken": "29c4de30-52e7-5112-8999-21c298ab34b1",
        "process": "RedshiftLoader",
        "startTime": "02-03-2023 15:35:29 EST",
        "endTime": "02-03-2023 15:35:45 EST",
        "totalTime": "15 sec",
        "status": "success"
    },
    {
        "createdAtTimestamp": "2023-02-03 14:53:06.591",
        "createdAt": 1675453986591,
        "updatedAt": 1675454005096,
        "dataset": "rimes_idx_std",
        "pipelineToken": "588b6bda-ea7a-51d2-89c9-085ae56dc5a0",
        "process": "ObjectStoreLoader",
        "startTime": "02-03-2023 14:53:06 EST",
        "endTime": "02-03-2023 14:53:25 EST",
        "totalTime": "timed out",
        "status": "error"
    },
    ...
]

NOTE: You can use the “pipelineToken” returned in the above call to make the next dataset status call to get the detail for the Pipeline job.

2. Retrieve the dataset status detail for a specific dataset ingestion

If you remember, each time a dataset job is triggered, the Pipeline generates a unique pipeline token for the job. This pipelineToken can be used to query the detail of a specific dataset job.

GET http://hostname/dataset/status

HeaderValue
x-api-keyapi-key
ParameterRequiredDescription
pipelinetokenyesThe pipeline token of the job you which to query detail
Here’s a curl example to make the call.
curl --location --request GET 'http://localhost:8080/dataset/status?pipelinetoken=92adbf1a-94de-5188-8e1d-607233b7a9d2' \
--header 'x-api-key: 1847626a-5b46-4d43-827c-25f323d9201b'

Sample response:

[
    {
        "id": 0,
        "dateTime": "02-06-2023 14:05:35 EST",
        "dataset": "stock_price",
        "processName": "FileNotifier",
        "publisherToken": "92adbf1a-94de-5188-8e1d-607233b7a9d2",
        "pipelineToken": "92adbf1a-94de-5188-8e1d-607233b7a9d2",
        "filename": "stock_price.2023-02-06.14-05-33-271.1675710333271.dataset.csv",
        "state": "begin",
        "code": "info",
        "description": "Data received, bucket: idata-poc-raw, key: temp/stock_price/stock_price.2023-02-06.14-05-33-271.1675710333271.dataset.csv",
        "epoch": 1675710335886
    },
    {
        "id": 0,
        "dateTime": "02-06-2023 14:05:36 EST",
        "dataset": "stock_price",
        "processName": "FileNotifier",
        "publisherToken": "92adbf1a-94de-5188-8e1d-607233b7a9d2",
        "pipelineToken": "92adbf1a-94de-5188-8e1d-607233b7a9d2",
        "filename": "stock_price.2023-02-06.14-05-33-271.1675710333271.dataset.csv",
        "state": "processing",
        "code": "info",
        "description": "Total file size: 9344845",
        "epoch": 1675710336498
    },
    {
        "id": 0,
        "dateTime": "02-06-2023 14:05:36 EST",
        "dataset": "stock_price",
        "processName": "FileNotifier",
        "publisherToken": "92adbf1a-94de-5188-8e1d-607233b7a9d2",
        "pipelineToken": "92adbf1a-94de-5188-8e1d-607233b7a9d2",
        "filename": "stock_price.2023-02-06.14-05-33-271.1675710333271.dataset.csv",
        "state": "end",
        "code": "info",
        "description": "Process completed successfully",
        "epoch": 1675710336623
    },
    {
        "id": 0,
        "dateTime": "02-06-2023 14:05:36 EST",
        "dataset": "stock_price",
        "processName": "JobRunner",
        "publisherToken": "92adbf1a-94de-5188-8e1d-607233b7a9d2",
        "pipelineToken": "92adbf1a-94de-5188-8e1d-607233b7a9d2",
        "filename": "stock_price.2023-02-06.14-05-33-271.1675710333271.dataset.csv",
        "state": "begin",
        "code": "info",
        "description": "Process started",
        "epoch": 1675710336749
    },
    ...
]
Exit mobile version