• Stars
    star
    138
  • Rank 255,133 (Top 6 %)
  • Language
    Go
  • License
    MIT License
  • Created about 4 years ago
  • Updated about 2 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A fully-featured AWS Athena database driver (+ athenareader https://github.com/uber/athenadriver/tree/master/athenareader)

CodeCov GoDoc Github release Go Report Card lic made


πŸ“¦ athenadriver - A fully-featured AWS Athena database driver for Go
🐚 athenareader - A moneywise command line utililty to query athena in command line.


Overview

(This project is a sandbox project and the development status is STABLE.)

athenadriver is a fully-featured AWS Athena database driver for Go developed at Uber Technologies Inc. It provides a hassle-free way of querying AWS Athena database with Go standard library. It not only provides basic features of Athena Go SDK, but addresses some SDK's limitation, improves and extends it. Moreover, it also includes advanced features like Athena workgroup and tagging creation, driver read-only mode and so on.

The PDF version of AthenaDriver document is available at πŸ“œ

Features

Except the basic features provided by Go database/sql like error handling, database pool and reconnection, athenadriver supports the following features out of box:

  • Support multiple AWS authorization methods πŸ”—
  • Full support of Athena Basic Data Types
  • Full support of Athena Advanced Type for queries with Geospatial identifiers, ML and UDFs
  • Full support of ALL Athena Query Statements, including DDL, DML and UTILITY πŸ”—
  • Support newly added INSERT INTO...VALUES
  • Athena workgroup and tagging support including remote workgroup creation πŸ”—
  • Go sql's prepared statement support πŸ”—
  • Go sql's DB.Exec() and db.ExecContext() support πŸ”—
  • Query cancelling support πŸ”—
  • Override default query timeout limits πŸ”—
  • Mask columns with specific values πŸ”—
  • Database missing value handling πŸ”—
  • Read-Only mode - disable database write in driver level πŸ”—
  • Moneywise mode πŸ’° - print out query cost(USD) for each query
  • Query with Athena Query ID(QID) - (the ultimate money saver! πŸ’Έ )
  • Pseudo commands from database/sql interface: get_driver_version, get_query_id, get_query_id_status, stop_query_id, get_workgroup, list_workgroups, update_workgroup, get_cost, get_execution_report etc πŸ”—
  • Builtin logging support with zap πŸ”—
  • Builtin metrics support with tally πŸ”—

athenadriver can extremely simplify your code. Check athenareader out as an example and a convenient tool for your Athena query in command line.

How to set up/install/test athenadriver

Prerequisites - AWS Credentials & S3 Query Result Bucket

To be able to query AWS Athena, you need to have an AWS account at Amazon AWS's website. To give it a shot, a free tier account is enough. You also need to have a pair of AWS access key ID and secret access key. You can get it from AWS Security Credentials section of Identity and Access Management (IAM). If you don't have one, please create it. The following is a screenshot from my temporary free tier account:

How to create AWS credentials

In addition to AWS credentials, you also need an s3 bucket to store query result. Just go to AWS S3 web console page to create one. In the examples below, the s3 bucket I use is s3://myqueryresults/.

In most cases, you need the following 4 prerequisites:

  • S3 Output bucket
  • access key ID
  • secret access key
  • AWS region

For more details on athenadriver's support on AWS credentials & S3 query result bucket, please refer to section Support Multiple AWS Authorization Methods.

Installation

Before Go 1.17, go get can be used to install athenadriver:

go get -u github.com/uber/athenadriver

Starting in Go 1.17, installing executables with go get is deprecated. go install may be used instead.

go install github.com/uber/athenadriver@latest

Tests

We provide unit tests and integration tests in the codebase.

Unit Test

All the unit tests are self-contained and passed even in no-internet environment. Test coverage is 100%.

$ cd $GOPATH/src/github.com/uber/athenadriver/go
βœ” /opt/share/go/path/src/github.com/uber/athenadriver [uber|✚ 1…12] 
21:35 $ go test -coverprofile=coverage.out github.com/uber/athenadriver/go  && \
 go tool cover -func=coverage.out |grep -v 100.0%
ok  	github.com/uber/athenadriver/go	9.255s	coverage: 100.0% of statements

Integration Test

All integration tests are under examples folder. Please make sure all prerequisites are met so that you can run the code on your own machine.

All the code snippets in examples folder are fully tested in our machines. For example, to run some stress and crash test, you can use examples/perf/concurrency.go. Build it first:

$cd $GOPATH/src/github.com/uber/athenadriver
$go build examples/perf/concurrency.go

Run it, wait for some output but not all, and unplug your network cable:

$./concurrency > examples/perf/concurrency.output.`date +"%Y-%m-%d-%H-%M-%S"`.log
58,13,53,54,78,96,32,48,40,11,35,31,65,61,1,73,74,22,34,49,80,5,69,37,0,79,
2020/02/09 13:49:29 error [38]RequestError: send request failed
caused by: Post https://athena.us-east-1.amazonaws.com/: dial tcp: 
lookup athena.us-east-1.amazonaws.com: no such host
...
2020/02/09 13:49:29 error [89]RequestError: send request failed
caused by: Post https://athena.us-east-1.amazonaws.com/: dial tcp: 
lookup athena.us-east-1.amazonaws.com: no such host

You can see RequestError is thrown out from the code. The active Athena queries failed because the network is down. Now re-plugin your cable and wait for network coming back, you can see the program automatically reconnects to Athena, and resumes to output data correctly:

72,25,92,98,15,93,41,7,8,90,81,56,66,2,18,84,87,63,
44,45,82,99,86,3,52,76,71,16,39,67,23,12,42,17,4,

How to use athenadriver

athenadriver is very easy to use. What you need to do it to import it in your code and then use the standard Go database/sql as usual.

import athenadriver "github.com/uber/athenadriver/go"

The following are coding examples to demonstrate athenadriver's features and how you should use athenadriver in your Go application. Please be noted the code is for demonstration purpose only, so please follow your own coding style or best practice if necessary.

Get Started - A Simple Query

The following is the simplest example for demonstration purpose. The source code is available at dml_select_simple.go.

package main

import (
	"database/sql"
	drv "github.com/uber/athenadriver/go"
)

func main() {
	// Step 1. Set AWS Credential in Driver Config.
	conf, _ := drv.NewDefaultConfig("s3://myqueryresults/",
		"us-east-2", "DummyAccessID", "DummySecretAccessKey")
	// Step 2. Open Connection.
	db, _ := sql.Open(drv.DriverName, conf.Stringify())
	// Step 3. Query and print results
	var url string
	_ = db.QueryRow("SELECT url from sampledb.elb_logs limit 1").Scan(&url)
	println(url)
}

To make it work for you, please replace OutputBucket, Region, AccessID and SecretAccessKey with your own values. sampledb is provided by Amazon so you don't have to worry about it.

To Build it:

$ go build examples/query/dml_select_simple.go 

Run it and you can see output like:

$ ./dml_select_simple 
https://www.example.com/articles/553

Support Multiple AWS Authentication Methods

athenadriver uses access keys(Access Key ID and Secret Access Key) to sign programmatic requests to AWS. When if the AWS_SDK_LOAD_CONFIG environment variable was set, athenadriver uses Shared Config, respects AWS CLI Configuration and Credential File Settings and gives it even higher priority over the values set in athenadriver.Config.

Use AWS CLI Config For Authentication

When environment variable AWS_SDK_LOAD_CONFIG is set, it will read aws_access_key_id(AccessID) and aws_secret_access_key(SecretAccessKey) from ~/.aws/credentials, region from ~/.aws/config. For details about ~/.aws/credentials and ~/.aws/config, please check here.

But you still need to specify correct OutputBucket in athenadriver.Config because it is not in the AWS client config.

OutputBucket is critical in Athena. Even if you have a default value set in Athena web console, you must pass one programmatically or you will get error: No output location provided. An output location is required either through the Workgroup result configuration setting or as an API input.

The sample code below enforces AWS_SDK_LOAD_CONFIG is set, so athenadriver's AWS Session will be created from the configuration values from the shared config (~/.aws/config) and shared credentials (~/.aws/credentials) files. Even if we pass all dummy values as parameters in NewDefaultConfig() except OutputBucket, they are overridden by the values in AWS CLI config files, so it doesn't really matter.

// To use AWS CLI's Config for authentication
func useAWSCLIConfigForAuth() {
	os.Setenv("AWS_SDK_LOAD_CONFIG", "1")
	// 1. Set AWS Credential in Driver Config.
	conf, err := drv.NewDefaultConfig(secret.OutputBucketProd, drv.DummyRegion,
		drv.DummyAccessID, drv.DummySecretAccessKey)
	if err != nil {
		return
	}
	// 2. Open Connection.
	db, _ := sql.Open(drv.DriverName, conf.Stringify())
	// 3. Query and print results
	var i int
	_ = db.QueryRow("SELECT 456").Scan(&i)
	println("with AWS CLI Config:", i)
	os.Unsetenv("AWS_SDK_LOAD_CONFIG")
}

If your AWS CLI setting is valid like mine, this function should output:

with AWS CLI Config: 456

The above authentication method also works for querying Athena in AWS Lambda. In lambda, you don't have to provide access ID, key and region, and you don't need AWS CLI config files either. You just need to specify the correct output bucket. Please check the AWS Lambda Go same code here.

Use athenadriver Config For Authentication

When environment variable AWS_SDK_LOAD_CONFIG is NOT set, you may explicitly define credentials by passing valid (NOT dummy) accessID, secretAccessKey, region, and outputBucket into athenadriver.NewDefaultConfig().

The sample code below ensure AWS_SDK_LOAD_CONFIG is not set, then pass four valid parameters into NewDefaultConfig():

// To use athenadriver's Config for authentication
func useAthenaDriverConfigForAuth() {
	os.Unsetenv("AWS_SDK_LOAD_CONFIG")
	// 1. Set AWS Credential in Driver Config.
	conf, err := drv.NewDefaultConfig(secret.OutputBucketDev, secret.Region,
		secret.AccessID, secret.SecretAccessKey)
	if err != nil {
		return
	}
	// 2. Open Connection.
	db, _ := sql.Open(drv.DriverName, conf.Stringify())
	// 3. Query and print results
	var i int
	_ = db.QueryRow("SELECT 123").Scan(&i)
	println("with AthenaDriver Config:", i)
}

The sample output:

with AthenaDriver Config: 123

The full code is here at examples/auth.go.

Use AWS SDK Default Credentials Resolution for Authentication

If environment variable AWS_SDK_LOAD_CONFIG is NOT set and credentials are not supplied in the athenadriver configuration, the AWS SDK will look up credentials using its default methodology described here: https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials.

Region and OutputBucket bucket still need to be explictly defined.

The sample code below ensures AWS_SDK_LOAD_CONFIG is not set, then creates a athenadriver config with OutputBucket and Region values set.

// To use AWS SDK Default Credentials
func useAthenaDriverConfigForAuth() {
	os.Unsetenv("AWS_SDK_LOAD_CONFIG")
	// 1. Set OutputBucket and Region in Driver Config.
	conf := drv.NewNoOpsConfig()
	conf.SetOutputBucket(secret.OutputBucketDev)
	conf.SetRegion(secret.Region)

	// 2. Open Connection.
	db, _ := sql.Open(drv.DriverName, conf.Stringify())
	// 3. Query and print results
	var i int
	_ = db.QueryRow("SELECT 123").Scan(&i)
	println("with AthenaDriver Config:", i)
}

The sample output:

with AthenaDriver Config: 123

Full Support of All Data Types

As we said, athenadriver supports all Athena data types. In the following sample code, we use an SQL statement to SELECT som simple data of all the advanced types and then print them out.

package main

import (
	"context"
	"database/sql"
	drv "github.com/uber/athenadriver/go"
)

func main() {
	// 1. Set AWS Credential in Driver Config.
	conf, err := drv.NewDefaultConfig("s3://myqueryresults/",
		"us-east-2", "DummyAccessID", "DummySecretAccessKey")
	if err != nil {
		panic(err)
	}
	// 2. Open Connection.
	dsn := conf.Stringify()
	db, _ := sql.Open(drv.DriverName, dsn)
	// 3. Query and print results
	query := "SELECT JSON '\"Hello Athena\"', " +
		"ST_POINT(-74.006801, 40.70522), " +
		"ROW(1, 2.0),  INTERVAL '2' DAY, " +
		"INTERVAL '3' MONTH, " +
		"TIME '01:02:03.456', " +
		"TIME '01:02:03.456 America/Los_Angeles', " +
		"TIMESTAMP '2001-08-22 03:04:05.321 America/Los_Angeles';"
	rows, err := db.Query(query)
	if err != nil {
		panic(err)
	}
	defer rows.Close()
	println(drv.ColsRowsToCSV(rows))
}

Sample output:

"Hello Athena",00 00 00 00 01 01 00 00 00 20 25 76 6d 6f 80 52 c0 18 3e 22 a6 44 5a 44 40,
{field0=1, field1=2.0},2 00:00:00.000,0-3,0000-01-01T01:02:03.456-07:52,
0000-01-01T01:02:03.456-07:52,2001-08-22T03:04:05.321-07:00

we can see athenadriver can handle all these advanced types correctly.

Query With Workgroup and Tag

athenadriver supports workgroup and tagging features of Athena. When you query Athena, you can specify the workgroup and tags attached with your query. Resource/cost tagging are based on workgroup. If the workgroup doesn't exist , by default it will be created programmatically.

If you want to disable programmatically creating workgroup and tags, you need to explicitly call:

Config.SetWGRemoteCreationAllowed(false)

In this case, you need to make sure the workgroup you specifies must exist, or you will get error. An example is like below:

package main

import (
	"database/sql"
	"log"
	drv "github.com/uber/athenadriver/go"
)

func main() {
	// 1. Set AWS Credential in Driver Config.
	conf, _ := drv.NewDefaultConfig("s3://myqueryresults/",
		"us-east-2", "DummyAccessID", "DummySecretAccessKey")
	wgTags := drv.NewWGTags()
	wgTags.AddTag("Uber User", "henry.wu")
	wgTags.AddTag("Uber ID", "123456")
	wgTags.AddTag("Uber Role", "SDE")
	// Specify that workgroup `henry_wu` is used for the following query
	wg := drv.NewWG("henry_wu", nil, wgTags)
	conf.SetWorkGroup(wg)
	// comment out the line below to allow remote workgroup creation and
	// the query will be successful!!!
	conf.SetWGRemoteCreationAllowed(false)
	// 2. Open Connection.
	dsn := conf.Stringify()
	db, _ := sql.Open(drv.DriverName, dsn)
	// 3. Query and print results
	rows, err := db.Query("select url from sampledb.elb_logs limit 3")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer rows.Close()

	var url string
	for rows.Next() {
		if err := rows.Scan(&url); err != nil {
			log.Fatal(err)
		}
		println(url)
	}
}

But I don't have a workgroup named henry_wu in AWS Athena, so I got sample output:

2020/01/20 15:29:52 Workgroup henry_wu doesn't exist and workgroup remote creation
 is disabled.

After commenting out conf.SetWGRemoteCreationAllowed(false) at line 27, the output becomes:

https://www.example.com/articles/553
http://www.example.com/images/501
https://www.example.com/images/183

and I can see a new workgroup named henry_wu is created in AWS Athena console: https://us-east-2.console.aws .amazon.com/athena/workgroups/home

Athena Workgroup and Tags Automatic Creation

Prepared Statement Support for Athena DB

Athena doesn't support prepared statement originally. However, it could be very helpful in some scenarios like where part of the query is from user input. athenadriver supports prepared statements to help you to deal with those scenarios. An example is as follows:

package main

import (
	"database/sql"
	drv "github.com/uber/athenadriver/go"
)

func main() {
	// 1. Set AWS Credential in Driver Config.
	conf, _ := drv.NewDefaultConfig("s3://myqueryresults/",
		"us-east-2", "DummyAccessID", "DummySecretAccessKey")
	// 2. Open Connection.
	db, _ := sql.Open(drv.DriverName, conf.Stringify())
	// 3. Prepared Statement
	statement, err := db.Prepare("CREATE TABLE sampledb.urls AS " +
		"SELECT url FROM sampledb.elb_logs where request_ip=? limit ?")
	if err != nil {
		panic(err)
	}
	// 4. Execute prepared Statement
	if result, e := statement.Exec("244.157.42.179", 2); e == nil {
		if rowsAffected, err := result.RowsAffected(); err == nil {
			println(rowsAffected)
		}
	}
}

Sample output:

2

You can also use the ? syntax with DB.Query() or DB.Exec() directly.

	rows, err := db.Query("SELECT request_timestamp,elb_name "+
		"from sampledb.elb_logs where url=? limit 1",
		"https://www.example.com/jobs/878")
	if err != nil {
		return
	}
	println(drv.ColsRowsToCSV(rows))

Sample Output:

request_timestamp,elb_name
2015-01-06T04:03:01.351843Z,elb_demo_006

DB.Exec() and DB.ExecContext()

According to Go source code, DB.Exec() and DB.ExecContext() execute a query that doesn't return rows, such as an INSERT or UPDATE. It's true that you can use DB.Exec() and DB.Query() interchangeably to execute the same SQL statements.
However, the two methods are for different use cases and return different types of results. According to Go database/sql library, the result returned from DB.Exec() can tell you how many rows were affected by the query and the last inserted ID for INSERT INTO statement, which is always -1 for Athena because auto-increment primary key feature is not supported by Athena.
In contrast, DB.Query() will return a sql.Rows object which includes all columns and rows details.

When the only concern is if the execution is successful or not, DB.Exec() is preferred to DB.Query() . The best coding practice is:

if _, err := DB.Exec(`<SQL_STATEMENT>`); err != nil {
    log_or_panic(err)
}

In cases of INSERT INTO, CTAG and CVAS, you may want to know when the execution is successful how many rows are affected by your query. Then you can use result.RowsAffected() as demonstrated in the following example:

package main

import (
	"context"
	"database/sql"
	drv "github.com/uber/athenadriver/go"
)

func main() {
	// 1. Set AWS Credential in Driver Config.
	var conf *drv.Config
	var err error
	if conf, err = drv.NewDefaultConfig("s3://myqueryresults/",
		"us-east-2", "DummyAccessID", "DummySecretAccessKey"); err != nil {
		panic(err)
	}
	// 2. Open Connection.
	db, _ := sql.Open(drv.DriverName, conf.Stringify())
	// 3. Execute and print results
	if _, err = db.ExecContext(context.Background(),
		"DROP TABLE IF EXISTS sampledb.urls"); err != nil {
		panic(err)
	}

	var result sql.Result
	if result, err = db.Exec("CREATE TABLE sampledb.urls AS "+
		"SELECT url FROM sampledb.elb_logs where request_ip=? limit ?",
		"244.157.42.179", 1); err != nil {
		panic(err)
	}
	println(result.RowsAffected())

	if result, err = db.Exec("INSERT INTO sampledb.urls VALUES (?),(?),(?)",
		"abc", "efg", "xyz"); err != nil {
		panic(err)
	}
	println(result.RowsAffected())
	println(result.LastInsertId()) // not supported by Athena
}

Sample output:

1
3

Mask Columns with Specific Values

Sometimes, database contains sensitive information and you may need to mask columns with specific values. If you don't want to display some columns, you can mask them by calling:

Config.SetMaskedColumnValue("columnName", "maskValue")

For example, if you want to mask all rows of column password, you can specify:

Config.SetMaskedColumnValue("password", "xxx")

Then all the passwords will be displayed as xxx in the query result set. The following is an example to mask column url in the result set.

package main

import (
	"database/sql"
	"log"
	drv "github.com/uber/athenadriver/go"
)

func main() {
	// 1. Set AWS Credential in Driver Config.
	conf, _ := drv.NewDefaultConfig("s3://myqueryresults/",
		"us-east-2", "DummyAccessID", "DummySecretAccessKey")
	conf.SetMaskedColumnValue("url", "xxx")
	// 2. Open Connection.
	dsn := conf.Stringify()
	db, _ := sql.Open(drv.DriverName, dsn)
	// 3. Query and print results
	rows, err := db.Query("select request_timestamp, url from " +
		"sampledb.elb_logs limit 3")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer rows.Close()

	var requestTimestamp string
	var url string
	for rows.Next() {
		if err := rows.Scan(&requestTimestamp, &url); err != nil {
			log.Fatal(err)
		}
		println(requestTimestamp + "," + url)
	}
}

Sample Output:

2015-01-03T12:00:00.516940Z,xxx
2015-01-03T12:00:00.902953Z,xxx
2015-01-03T12:00:01.206255Z,xxx

Query Cancellation

AWS Athena is priced upon the data size it scanned. To save money, athenadriver supports query cancellation. In the following example, the query is cancelled if it is not complete after 2 seconds.

package main

import (
	"context"
	"database/sql"
	"log"
	"time"
	drv "github.com/uber/athenadriver/go"
)

func main() {
	// 1. Set AWS Credential in Driver Config.
	conf, _ := drv.NewDefaultConfig("s3://myqueryresults/",
		"us-east-2", "DummyAccessID", "DummySecretAccessKey")

	// 2. Open Connection.
	dsn := conf.Stringify()
	db, _ := sql.Open(drv.DriverName, dsn)
	// 3. Query cancellation after 2 seconds
	ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
	rows, err := db.QueryContext(ctx, "select count(*) from sampledb.elb_logs")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer rows.Close()

	var requestTimestamp string
	var url string
	for rows.Next() {
		if err := rows.Scan(&requestTimestamp, &url); err != nil {
			log.Fatal(err)
		}
		println(requestTimestamp + "," + url)
	}
}

Sample Output:

2020/01/20 15:28:35 context deadline exceeded

Overriding Athena Service Limits for Query Timeout

This library assumes default Athena service limits for DDL and DML query timeouts, as can be found in athenadriver/go/constants.go. If you've increased your service limits, for example via the Athena Service Quotas console, you can override them on your Config.

Here's the same example found at Query Cancellation, but with an increased query timeout.

package main

import (
	"context"
	"database/sql"
	"log"
	"time"
	drv "github.com/uber/athenadriver/go"
)

func main() {
	// 1. Set AWS Credential in Driver Config.
	conf, _ := drv.NewDefaultConfig("s3://myqueryresults/",
		"us-east-2", "DummyAccessID", "DummySecretAccessKey")
	
	// 2. Override the DML query timeout to 60 minutes (3600 seconds). 
	serviceLimitOverride := drv.NewServiceLimitOverride()
	serviceLimitOverride.SetDMLQueryTimeout(3600)
	conf.SetServiceLimitOverride(*serviceLimitOverride)

	// 3. Open Connection.
	dsn := conf.Stringify()
	db, _ := sql.Open(drv.DriverName, dsn)
	
	// 4. Run the query.
	rows, err := db.QueryContext(context.Background(), "select count(*) from sampledb.elb_logs")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer rows.Close()

	var requestTimestamp string
	var url string
	for rows.Next() {
		if err := rows.Scan(&requestTimestamp, &url); err != nil {
			log.Fatal(err)
		}
		println(requestTimestamp + "," + url)
	}
}

Missing Value Handling

It is common to have missing values in S3 file, or Athena DB. When this happens, you can specify if you want to use empty string, default data, or nil as the missing value, whichever is better to facilitate your data processing or ETL job. The default data for Athena column type are defined as below:

func (r *Rows) getDefaultValueForColumnType(athenaType string) interface{} {
	switch athenaType {
	case "tinyint", "smallint", "integer", "bigint":
		return 0
	case "boolean":
		return false
	case "float", "double", "real":
		return 0.0
	case "date", "time", "time with time zone", "timestamp",
		"timestamp with time zone":
		return time.Time{}
	default:
		return ""
	}
}

By default, we use empty string to replace missing values and empty string is preferred to default data, or nil. To use default data, you have to explicitly call:

Config.SetMissingAsEmptyString(false)
Config.SetMissingAsDefault(true)

If you need to use nil as missing value, you can call:

Config.SetMissingAsEmptyString(false)
Config.SetMissingAsDefault(false)
Config.SetMissingAsNil(true)

But if you are strict with your data integrity and want an error raised when data are missing, you can set all three of them to false.

Read-Only Mode

When read-only mode is enabled in athenadriver, it only allows retrieving information from Athena database. Any writing and modification to the database will raise an error. This is useful in some cases. By default, read-only mode is disabled. To enable it, you need to explicitly call:

Config.SetReadOnly(true)

The following is one example. It enables read-only mode in line 19, but tries to create a new table with CTAS statement. It ends up with raising an error.

package main

import (
	"context"
	"database/sql"
	"log"
	drv "github.com/uber/athenadriver/go"
)

func main() {
	// 1. Set AWS Credential in Driver Config.
	conf, _ := drv.NewDefaultConfig("s3://myqueryresults/",
		"us-east-2", "DummyAccessID", "DummySecretAccessKey")
	conf.SetReadOnly(true)

	// 2. Open Connection.
	dsn := conf.Stringify()
	db, _ := sql.Open(drv.DriverName, dsn)
	// 3. Create Table with CTAS statement
	rows, err := db.QueryContext(context.Background(), 
	  "CREATE TABLE sampledb.elb_logs_new AS " +
		"SELECT * FROM sampledb.elb_logs limit 10;")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer rows.Close()
}

Sample Output:

2020/01/26 01:10:28 writing to Athena database is disallowed in read-only mode

Pseudo Commands

athenadriver provides pseudo command to support some special use cases beyond Go's standard database/sql framework. One sample use case is Asynchronous Query Support. pseudo command is a special prefix string you can put in db.QueryContext or db.QueryRow or db.ExecuteContext etc.

It is easier to explain with an example like pc_get_query_id.go.

package main

import (
	"database/sql"
	"os"
	secret "github.com/uber/athenadriver/examples/constants"
	drv "github.com/uber/athenadriver/go"
)

func main() {
	// 1. Set AWS Credential in Driver Config.
	os.Setenv("AWS_SDK_LOAD_CONFIG", "1")
	conf, err := drv.NewDefaultConfig(secret.OutputBucket, secret.Region, secret.AccessID, secret.SecretAccessKey)
	conf.SetLogging(true)
	if err != nil {
		panic(err)
		return
	}

	// 2. Open Connection.
	dsn := conf.Stringify()
	db, _ := sql.Open(drv.DriverName, dsn)

	// 3. Query with pseudo command `pc:get_query_id`
	var qid string
	_ = db.QueryRow("pc:get_query_id select url from sampledb.elb_logs limit 2").Scan(&qid)
	println("Query ID: ", qid)
}

In pc_get_query_id.go, we only want to get the Query ID of the SQL statement, so we just to add pc:get_query_id before the sql statement. So the final string we pass to db.QueryRow is pc:get_query_id select url from sampledb.elb_logs limit 2. The return value is one row with an Athena Query ID inside. A sample Output is:

Query ID: c89088ab-595d-4ee6-a9ce-73b55aeb8953

Now we support three pseudo commands: get_query_id, get_query_id_status, stop_query_id.

The syntax is pc:pseudo_command parameter.

get_query_id

pc:get_query_id SQL_STATEMENT - Will return Query ID of the SQL_STATEMENT, no matter request fails or succeeds. Example: pc_get_query_id.go.

get_query_id_status

pc:get_query_id_status Query_ID - Return status of the Query ID. Example: pc_get_query_id_status.go.

stop_query_id

pc:stop_query_id Query_ID - To stop the Query corresponding the Query ID. If there is no error, a one row string with OK will be returned. Example: pc_stop_query_id.go.

get_driver_version

pc:get_driver_version - To return the version of athenadriver. Example: pc_get_driver_version.go.

Enable Driver Logging

You can enable driver logging to help you to debug, monitoring and know more details about the running system. Logging is by default enabled and implemented as a no-op Logger. You need to pass a workable logger to make it work. If you don't want to log at all, you need to explicitly call:

  Config.SetLogging(false)

The following example is to pass in a zap Production logger.

package main

import (
	"context"
	"database/sql"
	"log"
	"time"
	"go.uber.org/zap"
	drv "github.com/uber/athenadriver/go"
)

func main() {
	// 1. Set AWS Credential in Driver Config.
	conf, _ := drv.NewDefaultConfig("s3://query-results-bucket-test/",
		"us-east-2",
		"dummy-to-be-replaced",
		"dummy-to-be-replaced")

	// 2. Open Connection.
	dsn := conf.Stringify()
	db, _ := sql.Open(drv.DBDriverName, dsn)

	logger, _ := zap.NewProduction()
	defer logger.Sync()
	// 3. Query cancellation after 2 seconds
	ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
	ctx = context.WithValue(ctx, drv.LoggerKey, logger)
	rows, err := db.QueryContext(ctx, "select count(*) from sampledb.elb_logs")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer rows.Close()
}

Sample Output:

{"level":"warn","ts":1579556666.3372262,"caller":"athenadriver/observability.go:72","msg":"query canceled","resp.QueryExecutionId":
 "ef4f3f09-a480-445c-84ad-96ecd97a8e90"}
2020/01/20 13:44:26 context deadline exceeded

Enable Metrics

athenadriver supports tally metrics reporting builtin. Metrics reporting is by default enabled but implemented as a no-op Scope. You need to pass a workable scope to make it work. If you don't want metrics at all, you need to explicitly call:

  Config.SetMetrics(false)

The following example is to pass in a scope with statsd reporter.

package main

import (
	"context"
	"database/sql"
	"io"
	"log"
	"time"
	"github.com/cactus/go-statsd-client/statsd"
	tallystatsd "github.com/uber-go/tally/statsd"
	drv "github.com/uber/athenadriver/go"
	"github.com/uber-go/tally"
)

func newScope() (tally.Scope, io.Closer) {
	statter, _ := statsd.NewBufferedClient("127.0.0.1:8125", "stats", 100*time.Millisecond, 1440)
	reporter := tallystatsd.NewReporter(statter, tallystatsd.Options{
		SampleRate: 1.0,
	})
	scope, closer := tally.NewRootScope(tally.ScopeOptions{
		Prefix:   "my_test_metrics_service",
		Tags:     map[string]string{},
		Reporter: reporter,
	}, time.Second)
	return scope, closer
}

func main() {
	// 1. Set AWS Credential in Driver Config.
	conf, _ := drv.NewDefaultConfig("s3://query-results-bucket-test/",
		"us-east-2", "dummy-to-be-replaced", "dummy-to-be-replaced")

	// 2. Open Connection.
	dsn := conf.Stringify()
	db, _ := sql.Open(drv.DBDriverName, dsn)

	// 3. Query cancellation after 2 seconds
	// Create tally scope
	scope, _ := newScope()
	// Create context and attach tally scope with context
	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
	ctx = context.WithValue(ctx, drv.MetricsKey, scope)
	rows, err := db.QueryContext(ctx, "select count(*) from sampledb.elb_logs")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer rows.Close()
}

Run netcat(nc) in another terminal to listen at port 8125 with command:

nc 8125 -l -u

Then run the code above, you can see the underlying details of driver are reported as metrics like below:

$ nc 8125 -l -u
stats.my_test_metrics_service.awsathena.connector.connect:0.140147|ms
stats.my_test_metrics_service.awsathena.query.workgroup:0.000607|ms
stats.my_test_metrics_service.awsathena.query.startqueryexecution:1191.644566|ms
stats.my_test_metrics_service.awsathena.query.queryexecutionstatesucceeded:3320.820154|ms

Limitations of Go/Athena SDK's and athenadriver's Solution

Column number mismatch in GetQueryResults of Athena Go SDK

ColumnInfo has more number of cloumns than Rows[0].Data

Affected Statements: DESCRIBE TABLE/VIEW, SHOW SCHEMA/TABLE/...

  • Sample Query:
DESC sampledb.elb_logs
  • Analysis:

Column number mismatch issue example 1

We can see there are 3 columns according to ColumnInfo under ResultSetMetadata. But in the first row Rows[0], we see there is only 1 field: "elb_name \tstring \t ". I would imagine there could have been 3 items in the Data[0], but somehow the code author doesn't split it with tab(\t), so it ends up with only 1 item. The same issue happens for SHOW statement.

For more sample code, please check util_desc_table.go, util_desc_view.go, and util_show.go.

  • athenadriver's Solution:

athenadriver fixes this issue by splitting Rows[0].Data[0] string with tab, and replace the original row with a new row which has the same number of data with columns.

ColumnInfo has cloumns but Rows are empty

Affected Statements: CTAS, CVAS, INSERT INTO

Sample Query:

CREATE TABLE sampledb.elb_logs_copy WITH (
    format = 'TEXTFILE',
    external_location = 's3://external-location-test/elb_logs_copy', 
    partitioned_by = ARRAY['ssl_protocol'])
AS SELECT * FROM sampledb.elb_logs

Analysis:

Column number mismatch issue example 2

In the above CTAS statement, we see there is one column of type bigint named "rows" in the resultset, but ResultSet.Rows is empty. Since there is no row, that one column doesn't make sense, or at least is confusing. The same issue happens for INSERT INTO statement.

  • athenadriver's Solution:

Because this issue happens only in statements CTAS, CVAS, and INSERT INTO , where UpdateCount is always valid and is the only meaningful information returned from Athena, athenadriver sets UpdateCount as the value of the returned row.

For more sample code, please check ddl_ctas.go, ddl_cvas.go, and dml_insert_into.go.

Type Loss for map, struct, array etc

One of Athena Go SDK's limitations is the type information could be lost after querying. I think there are two reasons for this type information loss.

The first reason is Athena SDK doesn't provide the full type information for complex type data. It assumes the application developers know the data schema and should take the responsibility of data serialization.

To dig into the code, all query results are stored in data structure ResultSet. From the UML class graph of ResultSet below, we can see the type information are stored in ColumnInfo's pointer to string variable Type, which is only a type name of data type, not containing any type metadata. For example, querying a map of string->boolean will return the type name map, but you cannot find the information string->boolean in the ResultSet. For simple type like integer, boolean or string, it is sufficient to serialize them to Go type, but for more complex types like array, struct, map or nested types, the type information is lost here.

UML class graph of ResultSet

The second reason is the difference between Athena data type and Go data type. Some Athena builtin data type like Row, DECIMAL(p, s), varbinary, interval year to month are not supported in Go standard library. Therefore, there is no way to serialize them in driver level.

  • athenadriver's Solution:

For data types: array, map, json, char, varchar, varbinary, row, string, binary, struct, interval year to month, interval day to second, decimal, athenadriver returns the string representation of the data. The developers can firstly retrieve the string representation, and then serialize to user defined type on their own.

For time and date types: date, time, time with time zone, timestamp, timestamp with time zone, athenadriver returns Go's time.Time.

Some sample code are available at dml_select_array.go, dml_select_map.go, dml_select_time.go.

FAQ

The following is a collection of questions from our software developers and data scientists.

Does athenadriver support database reconnection?

Yes. database/sql maintains a connection pool internally and handles connection pooling, reconnecting, and retry logic for you. One pitfall of writing Go sql application is cluttering the code with error-handling and retry. I tested in my application with athenadriver by turning off and on Wifi and VPN, it works very well with database reconnection.

Does athenadriver support batched query?

No. athenadriver is an implementation of sql.driver in Go database/sql, where there is no batch query support. There might be some workaround for some specific case though. For instance, if you want to insert many rows, you can use db.Exec by replacing multiple inserts with one insert and multiple VALUES.

How to use athenadriver to get total row number of result set?

You have to use rows.Next() to iterate all rows and use a counter to get row number. It is because Go database/sql was designed in a streaming query way with big data considered. That is why it only supports using Next() to iterate. So there is no way for random access of row. In Athena case, we only have random access of all the rows within one result page as the picture shown below:

Encapsulation of driver.Rows in sql.Rows

But due to encapsulation, more sepcifically the rowsi is private, we cannot access it directly like when we using Athena Go SDK. We have to use Next() to access it one by one.

Is there any way to randomly access row with athenadriver?

No. The reason is the same as answer to the previous question.

Does athenadriver support getting the rows affected by my query?

To put it simple, YES. But there is some limitation and best practice to follow.

The recommended way is to use DB.Exec() to get it. Please refer to πŸ”— .

You can get it with DB.Query() too. In the returned ResultSet, there is an UpdateCount member variable. If the query is one of CTAS, CVAS and INSERT INTO, UpdateCount will contain meaningful value. The result will be of a one row and one column. The column name is rows, and the row is an int, which is exactly UpdateCount. I would suggest to use QueryRow or QueryRowContext since it is a one-row result. By the way, the document for GetQueryResults seems not very accurate.

UpdateCount for CTAS, VTAS, and INSERT INTO

In practice, not only CTAS statement but also CVAS and INSERT INTO will make a meaningful UpdateCount.

Development Status: Stable

All APIs are finalized, and no breaking changes will be made in the 1.x series of releases.

This library is now at version 1 and follows SemVer strictly.

Contributing

We encourage and support an active, healthy community of contributors β€” including you! Details are in the contribution guide and the code of conduct. The athenadriver maintainers keep an eye on issues and pull requests, but you can also report any negative conduct to [email protected]. That email list is a private, safe space; even the athenadriver maintainers don't have access, so don't hesitate to hold us to a high standard.

athenadriver UML Class Diagram

For the contributors, the following is athenadriver Package's UML Class Diagram which may help you to understand the code. You can also check the reference section below for some useful materials.

athenadriver Package's UML Class Diagram

Reference

ChangeLog

v1.1.14 - Merge community contribution (August 19, 2022)

  • Adding default AWS SDK credential resolution to connector (@dfreiman-hbo, Dan Freiman)
  • Bump go-pretty version to most recent version (@nyergler, Nathan Yergler)
  • Expose DriverTracer factory functions (@andresmgot, Andres Martinez Gotor)
  • Add support to go 1.17+ (@henrywoo, Henry Fuheng Wu)
  • README cleanup (@henrywoo, Henry Fuheng Wu)

πŸ’‘ athenadriver and athenareader are created and maintained by Henry Fuheng Wu and brought to you by Uber Technologies.

More Repositories

1

react-vis

Data Visualization Components
JavaScript
8,657
star
2

baseweb

A React Component library implementing the Base design language
TypeScript
8,622
star
3

cadence

Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.
Go
7,808
star
4

RIBs

Uber's cross-platform mobile architecture framework.
Kotlin
7,672
star
5

kraken

P2P Docker registry capable of distributing TBs of data in seconds
Go
5,848
star
6

prototool

Your Swiss Army Knife for Protocol Buffers
Go
5,051
star
7

causalml

Uplift modeling and causal inference with machine learning algorithms
Python
4,759
star
8

h3

Hexagonal hierarchical geospatial indexing system
C
4,591
star
9

NullAway

A tool to help eliminate NullPointerExceptions (NPEs) in your Java code with low build-time overhead
Java
3,525
star
10

AutoDispose

Automatic binding+disposal of RxJava streams.
Java
3,358
star
11

aresdb

A GPU-powered real-time analytics storage and query engine.
Go
2,983
star
12

react-digraph

A library for creating directed graph editors
JavaScript
2,583
star
13

piranha

A tool for refactoring code related to feature flag APIs
Java
2,222
star
14

orbit

A Python package for Bayesian forecasting with object-oriented design and probabilistic models under the hood.
Python
1,803
star
15

ios-snapshot-test-case

Snapshot view unit tests for iOS
Objective-C
1,770
star
16

petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Python
1,751
star
17

needle

Compile-time safe Swift dependency injection framework
Swift
1,749
star
18

manifold

A model-agnostic visual debugging tool for machine learning
JavaScript
1,636
star
19

okbuck

OkBuck is a gradle plugin that lets developers utilize the Buck build system on a gradle project.
Java
1,536
star
20

UberSignature

Provides an iOS view controller allowing a user to draw their signature with their finger in a realistic style.
Objective-C
1,283
star
21

nanoscope

An extremely accurate Android method tracing tool.
HTML
1,240
star
22

tchannel

network multiplexing and framing protocol for RPC
Thrift
1,150
star
23

queryparser

Parsing and analysis of Vertica, Hive, and Presto SQL.
Haskell
1,069
star
24

fiber

Distributed Computing for AI Made Simple
Python
1,037
star
25

neuropod

A uniform interface to run deep learning models from multiple frameworks
C++
929
star
26

uReplicator

Improvement of Apache Kafka Mirrormaker
Java
898
star
27

pam-ussh

uber's ssh certificate pam module
Go
832
star
28

ringpop-go

Scalable, fault-tolerant application-layer sharding for Go applications
Go
815
star
29

h3-js

h3-js provides a JavaScript version of H3, a hexagon-based geospatial indexing system.
JavaScript
801
star
30

mockolo

Efficient Mock Generator for Swift
Swift
776
star
31

xviz

A protocol for real-time transfer and visualization of autonomy data
JavaScript
760
star
32

h3-py

Python bindings for H3, a hierarchical hexagonal geospatial indexing system
Python
755
star
33

streetscape.gl

Visualization framework for autonomy and robotics data encoded in XVIZ
JavaScript
702
star
34

react-view

React View is an interactive playground, documentation and code generator for your components.
TypeScript
688
star
35

nebula.gl

A suite of 3D-enabled data editing overlays, suitable for deck.gl
TypeScript
665
star
36

RxDogTag

Automatic tagging of RxJava 2+ originating subscribe points for onError() investigation.
Java
645
star
37

peloton

Unified Resource Scheduler to co-schedule mixed types of workloads such as batch, stateless and stateful jobs in a single cluster for better resource utilization.
Go
636
star
38

motif

A simple DI API for Android / Java
Kotlin
530
star
39

signals-ios

Typeful eventing
Objective-C
526
star
40

tchannel-go

Go implementation of a multiplexing and framing protocol for RPC calls
Go
480
star
41

grafana-dash-gen

grafana dash dash dash gen
JavaScript
476
star
42

marmaray

Generic Data Ingestion & Dispersal Library for Hadoop
Java
473
star
43

zanzibar

A build system & configuration system to generate versioned API gateways.
Go
451
star
44

clay

Clay is a framework for building RESTful backend services using best practices. It’s a wrapper around Flask.
Python
441
star
45

astro

Astro is a tool for managing multiple Terraform executions as a single command
Go
430
star
46

NEAL

πŸ”ŽπŸž A language-agnostic linting platform
OCaml
424
star
47

react-vis-force

d3-force graphs as React Components.
JavaScript
401
star
48

arachne

An always-on framework that performs end-to-end functional network testing for reachability, latency, and packet loss
Go
387
star
49

cadence-web

Web UI for visualizing workflows on Cadence
JavaScript
377
star
50

Python-Sample-Application

Python
374
star
51

rides-ios-sdk

Uber Rides iOS SDK (beta)
Swift
367
star
52

stylist

A stylist creates cool styles. Stylist is a Gradle plugin that codegens a base set of Android XML themes.
Kotlin
355
star
53

storagetapper

StorageTapper is a scalable realtime MySQL change data streaming, logical backup and logical replication service
Go
334
star
54

swift-concurrency

Concurrency utilities for Swift
Swift
323
star
55

RemoteShuffleService

Remote shuffle service for Apache Spark to store shuffle data on remote servers.
Java
317
star
56

cyborg

Display Android Vectordrawables on iOS.
Swift
301
star
57

rides-android-sdk

Uber Rides Android SDK (beta)
Java
288
star
58

h3-go

Go bindings for H3, a hierarchical hexagonal geospatial indexing system
Go
282
star
59

h3-java

Java bindings for H3, a hierarchical hexagonal geospatial indexing system
Java
260
star
60

hermetic_cc_toolchain

Bazel C/C++ toolchain for cross-compiling C/C++ programs
Starlark
251
star
61

h3-py-notebooks

Jupyter notebooks for h3-py, a hierarchical hexagonal geospatial indexing system
Jupyter Notebook
244
star
62

geojson2h3

Conversion utilities between H3 indexes and GeoJSON
JavaScript
216
star
63

artist

An artist creates views. Artist is a Gradle plugin that codegens a base set of Android Views.
Kotlin
210
star
64

tchannel-node

JavaScript
205
star
65

RxCentralBle

A reactive, interface-driven central role Bluetooth LE library for Android
Java
198
star
66

uberalls

Track code coverage metrics with Jenkins and Phabricator
Go
187
star
67

SwiftCodeSan

SwiftCodeSan is a tool that "sanitizes" code written in Swift.
Swift
172
star
68

rides-python-sdk

Uber Rides Python SDK (beta)
Python
170
star
69

doubles

Test doubles for Python.
Python
165
star
70

logtron

A logging MACHINE
JavaScript
158
star
71

cadence-java-client

Java framework for Cadence Workflow Service
Java
139
star
72

cassette

Store and replay HTTP requests made in your Python app
Python
138
star
73

UBTokenBar

Flexible and extensible UICollectionView based TokenBar written in Swift
Swift
136
star
74

tchannel-java

A Java implementation of the TChannel protocol.
Java
133
star
75

bayesmark

Benchmark framework to easily compare Bayesian optimization methods on real machine learning tasks
Python
128
star
76

android-template

This template provides a starting point for open source Android projects at Uber.
Java
127
star
77

crumb

An annotation processor for breadcrumbing metadata across compilation boundaries.
Kotlin
122
star
78

py-find-injection

Look for SQL injection attacks in python source code
Python
119
star
79

rides-java-sdk

Uber Rides Java SDK (beta)
Java
102
star
80

startup-reason-reporter

Reports the reason why an iOS App started.
Objective-C
96
star
81

uber-poet

A mock swift project generator & build runner to help benchmark various module dependency graphs.
Python
95
star
82

cadence-java-samples

Java
94
star
83

charlatan

A Python library to efficiently manage and install database fixtures
Python
89
star
84

swift-abstract-class

Compile-time abstract class validation for Swift
Swift
83
star
85

simple-store

Simple yet performant asynchronous file storage for Android
Java
81
star
86

tchannel-python

Python implementation of the TChannel protocol.
Python
77
star
87

client-platform-engineering

A collection of cookbooks, scripts and binaries used to manage our macOS, Ubuntu and Windows endpoints
Ruby
72
star
88

eight-track

Record and playback HTTP requests
JavaScript
70
star
89

multidimensional_urlencode

Python library to urlencode a multidimensional dict
Python
67
star
90

lint-checks

A set of opinionated and useful lint checks
Kotlin
67
star
91

uncaught-exception

Handle uncaught exceptions.
JavaScript
66
star
92

swift-common

Common code used by various Uber open source projects
Swift
65
star
93

uberscriptquery

UberScriptQuery, a SQL-like DSL to make writing Spark jobs super easy
Java
58
star
94

sentry-logger

A Sentry transport for Winston
JavaScript
55
star
95

graph.gl

WebGL2-Powered Visualization Components for Graph Visualization
JavaScript
51
star
96

nanoscope-art

C++
48
star
97

assume-role-cli

CLI for AssumeRole is a tool for running programs with temporary credentials from AWS's AssumeRole API.
Go
47
star
98

airlock

A prober to probe HTTP based backends for health
JavaScript
47
star
99

mutornadomon

Easy-to-install monitor endpoint for Tornado applications
Python
46
star
100

kafka-logger

A kafka logger for winston
JavaScript
45
star