From 3ff5c27a535cb4878d38f44400bd06c5ee6a1243 Mon Sep 17 00:00:00 2001 From: Vlasta Hajek Date: Sun, 7 Jun 2020 13:31:43 +0200 Subject: [PATCH] Feature: Added streamed query support (#65) * Revert "Removed support for querying" This reverts commit bbacd3316618cc333a431d8ae30de3950b508456. * Revert "Removed querying from Readme" This reverts commit 4d46172b9c3eaba74233f319e9241d66906ec68d. * Revert "Removed word reading as query support has been already removed" This reverts commit 86d79c2f09e3303ca3234145f7eef9f4550d8482. * Revert "Removed word reading as query support has been already removed" This reverts commit 134f433d4ec55755473b91fcac8ba32561b031c3. * Adopted to client.query * feat: support for streamed querying, parsed flux result --- .gitignore | 2 + CHANGELOG.md | 2 + README.md | 213 +++- examples/QueryAggregated/QueryAggregated.ino | 163 ++++ examples/QueryTable/QueryTable.ino | 148 +++ .../SecureBatchWrite/SecureBatchWrite.ino | 29 +- examples/SecureWrite/SecureWrite.ino | 29 +- keywords.txt | 3 + library.properties | 4 +- src/InfluxDbClient.cpp | 148 ++- src/InfluxDbClient.h | 10 +- src/query/CsvReader.cpp | 111 +++ src/query/CsvReader.h | 51 + src/query/FluxParser.cpp | 274 ++++++ src/query/FluxParser.h | 109 +++ src/query/FluxTypes.cpp | 181 ++++ src/query/FluxTypes.h | 167 ++++ src/query/HttpStreamScanner.cpp | 103 ++ src/query/HttpStreamScanner.h | 64 ++ src/util/debug.h | 38 + src/util/helpers.cpp | 48 + src/util/helpers.h | 37 + test/TestSupport.cpp | 99 -- test/TestSupport.h | 118 ++- test/customSettings.h | 6 + test/server/Readme.md | 2 +- test/server/server.js | 147 ++- test/test.ino | 907 +++++++++++++++--- 28 files changed, 2824 insertions(+), 389 deletions(-) create mode 100644 examples/QueryAggregated/QueryAggregated.ino create mode 100644 examples/QueryTable/QueryTable.ino create mode 100644 src/query/CsvReader.cpp create mode 100644 src/query/CsvReader.h create mode 100644 src/query/FluxParser.cpp create mode 100644 src/query/FluxParser.h create mode 100644 src/query/FluxTypes.cpp create mode 100644 src/query/FluxTypes.h create mode 100644 src/query/HttpStreamScanner.cpp create mode 100644 src/query/HttpStreamScanner.h create mode 100644 src/util/debug.h create mode 100644 src/util/helpers.cpp create mode 100644 src/util/helpers.h delete mode 100644 test/TestSupport.cpp create mode 100644 test/customSettings.h diff --git a/.gitignore b/.gitignore index 76efb07..3ab6579 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ node_modules .vscode +*.txt +customSettings.h diff --git a/CHANGELOG.md b/CHANGELOG.md index 27fac52..54652e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ # Changelog +## Version 3.2 (in-production) +- [NEW] Added possibility to read data from InfluxDB using Flux queries ## Version 3.1.3 (2020-04-27) - [FIX] SecureWrite crash (#54) diff --git a/README.md b/README.md index 4be1ee8..fca2967 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # InfluxDB Arduino Client -Simple Arduino client for writing data to [InfluxDB](https://www.influxdata.com/products/influxdb-overview/), it doesn't matter whether a local server or InfluxDB Cloud. Library supports authentication, secure communication over TLS, [batching](#writing-in-batches), [automatic retrying](#buffer-handling-and-retrying) on server backpressure and connection failure. +Simple Arduino client for writing and reading data from [InfluxDB](https://www.influxdata.com/products/influxdb-overview/), it doesn't matter whether a local server or InfluxDB Cloud. Library supports authentication, secure communication over TLS, [batching](#writing-in-batches), [automatic retrying](#buffer-handling-and-retrying) on server backpressure and connection failure. It also allows setting data in various formats, automatically escapes special characters and offers specifying timestamp in various precisions. @@ -8,6 +8,27 @@ Library support both [InfluxDB 2](#basic-code-for-influxdb-2) and [InfluxDB 1](# This is a new implementation and API, [original API](#original-api) is still supported. +Supported devices: ESP8266 (2.7+) and ESP32 (1.0.3+). + +- [Basic code for InfluxDB 2](#basic-code-for-influxdb-2) +- [Basic code for InfluxDB 1](#basic-code-for-influxdb-1) +- [Connecting to InfluxDB Cloud 2](#connecting-to-influxdb-cloud-2) +- [Writing in Batches](#writing-in-batches) + - [Timestamp](#timestamp) + - [Configure Timel](#configure-time) + - [Batch Size](#batch-size) +- [Buffer Handling and Retrying](#buffer-handling-and-retrying) +- [Write Options](#write-options) +- [Secure Connection](#secure-connection) + - [InfluxDb 2](#influxdb-2) + - [InfluxDb 1](#influxdb-1) +- [Querying](#querying) +- [Original API](#original-api) +- [Troubleshooting](#troubleshooting) +- [Contributing](#contributing) +- [License](#license) + + ## Basic code for InfluxDB 2 Using client is very easy. After [seting up InfluxDB 2 server](https://v2.docs.influxdata.com/v2.0/get-started), first define connection parameters and a client instance: ```cpp @@ -48,7 +69,7 @@ Data can be seen in the InfluxDB UI immediately. Use [Data Explorer](https://v2. ## Basic code for InfluxDB 1 Using InfluxDB Arduino client for InfluxDB 1 is almost the same as for InfluxDB 2. The only difference is that InfluxDB 1 uses _database_ as classic name for data storage instead of bucket and the server is unsecured by default. -There is just different `InfluxDBClient contructor` and `setConnectionParametersV1` method for setting also security params. Everything else remains the same. +There is just different `InfluxDBClient contructor` and `setConnectionParametersV1` function for setting also security params. Everything else remains the same. ```cpp // InfluxDB server url, e.g. http://192.168.1.48:8086 (don't use localhost, always server name or ip address) @@ -102,11 +123,10 @@ Read more about [secure connection](#secure-connection). Additionally, time needs to be synced: ```cpp -// Synchronize UTC time with NTP servers +// Synchronize time with NTP servers and set timezone // Accurate time is necessary for certificate validaton and writing in batches -configTime(0, 0, "pool.ntp.org", "time.nis.gov"); -// Set timezone -setenv("TZ", TZ_INFO, 1); +// For the fastest time sync find NTP servers in your area: https://www.pool.ntp.org/zone/ +configTzTime(TZ_INFO "pool.ntp.org", "time.nis.gov"); ``` Read more about time synchronization in [Configure Time](#configure-time). @@ -135,7 +155,7 @@ If points have no timestamp assigned, InfluxDB assigns timestamp at the time of InfuxDB allows sending timestamp in various precisions - nanoseconds, microseconds, milliseconds or seconds. The milliseconds precision is usually enough for using on Arduino. -The client has to be configured with time precision. The default settings is not using the timestamp. The `setWriteOptions` methods allow setting various parameters and one of them is __write precision__: +The client has to be configured with time precision. The default settings is not using the timestamp. The `setWriteOptions` functions allow setting various parameters and one of them is __write precision__: ``` cpp // Set write precision to milliseconds. Leave other parameters default. client.setWriteOptions(WritePrecision::MS); @@ -149,43 +169,46 @@ If you want to manage timestamp on your own, there are several ways how to set t ### Configure Time -Dealing with timestamps requires the device has correctly set time. This can be done with just a few lines of code: +Dealing with timestamps, and also validating server or CA certificate, requires the device has correctly set time. This can be done with just one line of code: ```cpp -// Synchronize UTC time with NTP servers +// Synchronize time with NTP servers and set timezone // Accurate time is necessary for certificate validaton and writing in batches -configTime(0, 0, "pool.ntp.org", "time.nis.gov"); -// Set timezone -setenv("TZ", "PST8PDT", 1); +// For the fastest time sync find NTP servers in your area: https://www.pool.ntp.org/zone/ +configTzTime("PST8PDT", "pool.ntp.org", "time.nis.gov"); ``` -The `configTime` method starts the time synchronization with NTP servers. The first two parameters specify DST and timezone offset, but we keep them zero and configure timezone info later. +The `configTzTime` function starts the time synchronization with NTP servers. The first parameter specifies timezone information, which is important for distinguishing UTC and a local timezone and for daylight saving changes. The last two string parameters are the internet addresses of NTP servers. Check [pool.ntp.org](https://www.pool.ntp.org/zone) for address of some local NTP servers. -Using the `setenv` method with `TZ` param ensures a device has the correct timezone. This is critical for distinguishing UTC and a local timezone because timestamps of points must be set in the UTC timezone. -The second parameter is timezone information, which is described at [https://www.gnu.org/software/libc/manual/html_node/TZ-Variable.html](https://www.gnu.org/software/libc/manual/html_node/TZ-Variable.html). - +Timezone string details are described at [https://www.gnu.org/software/libc/manual/html_node/TZ-Variable.html](https://www.gnu.org/software/libc/manual/html_node/TZ-Variable.html). Values for some timezones: - Central Europe: `CET-1CEST,M3.5.0,M10.5.0/3` - Eastern: `EST5EDT` - Japanesse: `JST-9` - Pacific Time: `PST8PDT` -We can also set the timezone info (DST and UTC offset) in the first two parameters of the `configTime` method. - -There is also a method, which allows to set timezone string and NTP servers at the same time. It has a different name for ESP8266 and ESP32. It's declaration is following: +There is also another function for syncing the time, which takes timezone and DST offset. As DST info is set via static offset it will create local time problem when DST change will occur. +It's declaration is following: ```cpp -// For ESP8266 -void configTime(const char* tz, const char* server1, const char* server2 = nullptr, const char* server3 = nullptr); - -// For ESP32 -void configTzTime(const char* tz, const char* server1, const char* server2 = nullptr, const char* server3 = nullptr); +configTime(long gmtOffset_sec, int daylightOffset_sec, const char* server1, const char* server2 = nullptr, const char* server3 = nullptr); ``` -In the example code it would be (for ESP8266): + +In the example code it would be: ```cpp -// Synchronize UTC time with NTP servers +// Synchronize time with NTP servers // Accurate time is necessary for certificate validaton and writing in batches -configTime("PST8PDT", "pool.ntp.org", "time.nis.gov"); +configTime(3600, 3600, "pool.ntp.org", "time.nis.gov"); +``` + +Both `configTzTime` and `configTime` functions are asynchronous. This means that calling the functions just starts the time synchronization. Time is often not synchronized yet upon returning from call. + +There is a helper function `timeSync` provided with the this library. The function starts time synchronization by calling the `configTzTime` and waits maximum 20 seconds for time is synchronized. It prints progress info and final local time to the `Serial`. +`timeSync` has the same signature and `configTzTime` and it is included with the main header file `InfluxDbClient.h`: +```cpp +// Synchronize time with NTP servers and waits for completition. Prints waiting progress and final synchronized time to the Serial. +// Accurate time is necessary for certificate validion and writing points in batch +// For the fastest time sync find NTP servers in your area: https://www.pool.ntp.org/zone/ +void timeSync(const char *tzInfo, const char* ntpServer1, const char* ntpServer2 = nullptr, const char* ntpServer3 = nullptr); ``` -The way how the time synchronisation is shown in the library examples is chosen to have the most similar code for both currently supported devices. ### Batch Size Setting batch size depends on data gathering and DB updating strategy. @@ -195,7 +218,7 @@ For example, if you would like to see updates (on the dashboard or in processing In case that data should be written in longer periods and gathered data consists of several points batch size should be set to an expected number of points. -To set batch size we use [setWriteOptions](#write-options) method, where second parameter controls batch size: +To set batch size we use [setWriteOptions](#write-options) function, where second parameter controls batch size: ```cpp // Enable messages batching client.setWriteOptions(WritePrecision::MS, 10); @@ -216,27 +239,27 @@ if(!client.writePoint(point10)) { } ``` -In case of a number of points is not always the same, set batch size to the maximum number of points and use the `flushBuffer()` method to force writing to DB. See [Buffer Handling](#buffer-handling-and-retrying) for more details. +In case of a number of points is not always the same, set batch size to the maximum number of points and use the `flushBuffer()` function to force writing to DB. See [Buffer Handling](#buffer-handling-and-retrying) for more details. ## Buffer Handling and Retrying InfluxDB contains an underlying buffer for handling writing in batches and automatic retrying on server backpressure and connection failure. -Its size is controled by the 3rd parameter of [setWriteOptions](#write-options) method: +Its size is controled by the 3rd parameter of [setWriteOptions](#write-options) function: ```cpp // Enable messages batching client.setWriteOptions(WritePrecision::MS, 10, 30); ``` The third parameter specifies the buffer size. The recommended size is at least 2 x batch size. -State of the buffer can be determined via two methods: +State of the buffer can be determined via two functions: - `isBufferEmpty()` - Returns true if buffer is empty - `isBufferFull()` - Returns true if buffer is full Full buffer can occur when there is a problem with an internet connection or the InfluxDB server is overloaded. In such cases, points to write remains in buffer. When more points are added and connection problem remains, the buffer will reach the top and new points will overwrite older points. - Each attempt to write a point will try to send older points in the buffer. So, the `isBufferFull()` method can be used to skip low priority points. + Each attempt to write a point will try to send older points in the buffer. So, the `isBufferFull()` function can be used to skip low priority points. -The `flushBuffer()` method can be used to force writing, even the number of points in the buffer is lower than the batch size. With the help of the `isBufferEmpty()` method a check can be made before a device goes to sleep: +The `flushBuffer()` function can be used to force writing, even the number of points in the buffer is lower than the batch size. With the help of the `isBufferEmpty()` function a check can be made before a device goes to sleep: ```cpp // Check whether buffer in not empty @@ -246,14 +269,14 @@ The `flushBuffer()` method can be used to force writing, even the number of poin } ``` -Other methods for dealing with buffer: - - `checkBuffer()` - Checks point buffer status and flushes if the number of points reaches batch size or flush interval runs out. This main method for controlling buffer and it is used internally. +Other functions for dealing with buffer: + - `checkBuffer()` - Checks point buffer status and flushes if the number of points reaches batch size or flush interval runs out. This main function for controlling buffer and it is used internally. - `resetBuffer()` - Clears the buffer. Check [SecureBatchWrite example](examples/SecureBatchWrite/SecureBatchWrite.ino) for example code of buffer handling functions. ## Write Options -Writing points can be controlled via several parameters in `setWriteOptions` method: +Writing points can be controlled via several parameters in `setWriteOptions` function: | Parameter | Default Value | Meaning | |-----------|---------------|---------| @@ -321,7 +344,7 @@ There are two ways to set certificate or fingerprint to trust a server: // InfluxDB client instance with preconfigured InfluxCloud certificate InfluxDBClient client(INFLUXDB_URL, INFLUXDB_ORG, INFLUXDB_BUCKET, INFLUXDB_TOKEN, InfluxDbCloud2CACert); ``` -- Use `setConnectionParams` method: +- Use `setConnectionParams` function: ```cpp // InfluxDB client instance InfluxDBClient client; @@ -333,7 +356,7 @@ void setup() { ``` ### InfluxDb 1 -Use `setConnectionParamsV1` method: +Use `setConnectionParamsV1` function: ```cpp // InfluxDB client instance InfluxDBClient client; @@ -343,18 +366,94 @@ void setup() { client.setConnectionParamsV1(INFLUXDB_URL, INFLUXDB_DATABASE, INFLUXDB_USER, INFLUXDB_PASSWORD, InfluxDbCloud2CACert); } ``` +Another important prerequisity to sucessfully validate server or CA certificate is to have properly synchronized time. More on this in [Configure Timel](#configure-time). -## Troubleshooting -All db methods return status. Value `false` means something went wrong. Call `getLastErrorMessage()` to get the error message. +Note: Time synchronization is not required for validating server certificate via SHA1 fingerprint. -When error message doesn't help to explain the bad behavior, go to the library sources and in the file `src/InfluxDBClient.cpp` uncomment line 30: -```cpp -// Uncomment bellow in case of a problem and rebuild sketch -#define INFLUXDB_CLIENT_DEBUG +## Querying +InfluxDB 2 and InfluxDB 1.7+ (with [enabled flux](https://docs.influxdata.com/influxdb/latest/administration/config/#flux-enabled-false)) uses [Flux](https://www.influxdata.com/products/flux/) to process and query data. InfluxDB client for Arduino offers a simple, but powerful, way how to query data with `query` function. It parses response line by line, so it can read a huge responses (thousands data lines), without consuming a lot device memory. + +The `query` returns `FluxQueryResult` object, which parses response and provides useful getters for accessing values from result set. + +InfluxDB flux query result set is returned in the CSV format. In the example bellow, the first line contains type information and the second columns name and the rest is data: +```CSV +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string +,result,table,_start,_stop,_time,_value,SSID,_field,_measurement,device +,_result,0,2020-05-18T15:06:00.475253281Z,2020-05-19T15:06:00.475253281Z,2020-05-19T13:07:13Z,-55,667G,rssi,wifi_status,ESP32 +,_result,0,2020-05-18T15:06:00.475253281Z,2020-05-19T15:06:00.475253281Z,2020-05-19T13:07:27Z,-54,667G,rssi,wifi_status,ESP32 +,_result,0,2020-05-18T15:06:00.475253281Z,2020-05-19T15:06:00.475253281Z,2020-05-19T13:07:40Z,-54,667G,rssi,wifi_status,ESP32 +,_result,0,2020-05-18T15:06:00.475253281Z,2020-05-19T15:06:00.475253281Z,2020-05-19T13:07:54Z,-54,667G,rssi,wifi_status,ESP32 +,_result,0,2020-05-18T15:06:00.475253281Z,2020-05-19T15:06:00.475253281Z,2020-05-19T13:08:07Z,-55,667G,rssi,wifi_status,ESP32 +,_result,0,2020-05-18T15:06:00.475253281Z,2020-05-19T15:06:00.475253281Z,2020-05-19T13:08:20Z,-56,667G,rssi,wifi_status,ESP32 ``` -Then upload your sketch again and see the debug output in the Serial Monitor. -If you couldn't solve a problem by yourself, please, post an issue including the debug output. +Accessing data using `FluxQueryResult` requires knowing the query result structure, especially the name and the type of the column. The best practise is to tune query +in the `InfluxDB Data Explorer` and use the final query with this library. + + Browsing thought the result set is done by repeatedly calling the `next()` method, until it returns false. Unsuccesful reading is distinqushed by non empty value from the `getError()` method. + As a flux query result can contain several tables, differing by grouping key, use the `hasTableChanged()` method to know when there is a new table. + Single values are returned using the `getValueByIndex()` or `getValueByName()` methods. + All row values at once are retreived by the `getValues()` method. + Always call the `close()` method at the of reading. + +A value in the flux query result column, retrieved by the `getValueByIndex()` or `getValueByName()` methods, is represented by the `FluxValue` object. +It provides getter methods for supported flux types: + +| Flux type | Getter | C type | +| ----- | ------ | --- | +| long | getLong() | long | +| unsignedLong | getUnsignedLong() | unsingned long | +| dateTime:RFC3339, dateTime:RFC3339Nano | getDateTime() | [FluxDateTime](src/query/FluxTypes.h#L100) | +| bool | getBool() | bool | +| double | bool | double | +| string, base64binary, duration | getString() | String | + +Calling improper type getter will result in a zero (empty) value. + +Check for null (missing) value usig the `isNull()` method. + +Use the `getRawValue()` method for getting original string form. + +```cpp +// Construct a Flux query +// Query will find RSSI for last 24 hours for each connected WiFi network with this device computed by given selector function +String query = "from(bucket: \"my-bucket\") |> range(start: -24h) |> filter(fn: (r) => r._measurement == \"wifi_status\" and r._field == \"rssi\""; +query += "and r.device == \"ESP32\")"; +query += "|> max()"; + +// Send query to the server and get result +FluxQueryResult result = client.query(query); + +// Iterate over rows. Even there is just one row, next() must be called at least once. +while (result.next()) { + // Get typed value for flux result column 'SSID' + String ssid = result.getValueByName("SSID").getString(); + Serial.print("SSID '"); + Serial.print(ssid); + + Serial.print("' with RSSI "); + + // Get converted value for flux result column '_value' where there is RSSI value + long value = result.getValueByName("_value").getLong(); + Serial.print(value); + + // Format date-time for printing + // Format string according to http://www.cplusplus.com/reference/ctime/strftime/ + String timeStr = time.format("%F %T"); + + Serial.print(" at "); + Serial.print(timeStr); + + Serial.println(); +} + +// Check if there was an error +if(result.getError() != "") { + Serial.print("Query result error: "); + Serial.println(result.getError()); +} +``` +Complete source code is available in [QueryAggregated example](examples/QueryAggregated/QueryAggregated.ino). ## Original API @@ -420,3 +519,23 @@ influx.prepare(measurement3) // writes all prepared measurements with a single request into db. boolean success = influx.write(); ``` + +## Troubleshooting +All db methods return status. Value `false` means something went wrong. Call `getLastErrorMessage()` to get the error message. + +When error message doesn't help to explain the bad behavior, go to the library sources and in the file `src/InfluxDBClient.cpp` uncomment line 44: +```cpp +// Uncomment bellow in case of a problem and rebuild sketch +#define INFLUXDB_CLIENT_DEBUG +``` +Then upload your sketch again and see the debug output in the Serial Monitor. + +If you couldn't solve a problem by yourself, please, post an issue including the debug output. + +## Contributing + +If you would like to contribute code you can do through GitHub by forking the repository and sending a pull request into the `master` branch. + +## License + +The InfluxDB Arduino Client is released under the [MIT License](https://opensource.org/licenses/MIT). diff --git a/examples/QueryAggregated/QueryAggregated.ino b/examples/QueryAggregated/QueryAggregated.ino new file mode 100644 index 0000000..f2a88c3 --- /dev/null +++ b/examples/QueryAggregated/QueryAggregated.ino @@ -0,0 +1,163 @@ +/** + * QueryAggregated Example code for InfluxDBClient library for Arduino. + * + * This example demonstrates querying basic aggreagated statistic parameters of WiFi signal level measured and stored in BasicWrite and SecureWrite examples. + * + * Demonstrates connection to any InfluxDB instance accesible via: + * - unsecured http://... + * - secure https://... (appropriate certificate is required) + * - InfluxDB 2 Cloud at https://cloud2.influxdata.com/ (certificate is preconfigured) + * + * Enter WiFi and InfluxDB parameters below + **/ + +#if defined(ESP32) +#include +WiFiMulti wifiMulti; +#define DEVICE "ESP32" +#elif defined(ESP8266) +#include +ESP8266WiFiMulti wifiMulti; +#define DEVICE "ESP8266" +#endif + +#include +#include + +// WiFi AP SSID +#define WIFI_SSID "SSID" +// WiFi password +#define WIFI_PASSWORD "PASSWORD" +// InfluxDB v2 server url, e.g. https://eu-central-1-1.aws.cloud2.influxdata.com (Use: InfluxDB UI -> Load Data -> Client Libraries) +// InfluxDB 1.8+ (v2 compatibility API) server url, e.g. http://192.168.1.48:8086 +#define INFLUXDB_URL "server-url" +// InfluxDB v2 server or cloud API authentication token (Use: InfluxDB UI -> Load Data -> Tokens -> ) +// InfluxDB 1.8+ (v2 compatibility API) use form user:password, eg. admin:adminpass +#define INFLUXDB_TOKEN "server token" +// InfluxDB v2 organization name or id (Use: InfluxDB UI -> Settings -> Profile -> ) +// InfluxDB 1.8+ (v2 compatibility API) leave empty +#define INFLUXDB_ORG "org name/id" +// InfluxDB v2 bucket name (Use: InfluxDB UI -> Load Data -> Buckets) +// InfluxDB 1.8+ (v2 compatibility API) use database name +#define INFLUXDB_BUCKET "bucket name" + +// Set timezone string according to https://www.gnu.org/software/libc/manual/html_node/TZ-Variable.html +// Examples: +// Pacific Time: "PST8PDT" +// Eastern: "EST5EDT" +// Japanesse: "JST-9" +// Central Europe: "CET-1CEST,M3.5.0,M10.5.0/3" +#define TZ_INFO "CET-1CEST,M3.5.0,M10.5.0/3" + +// InfluxDB client instance with preconfigured InfluxCloud certificate +InfluxDBClient client(INFLUXDB_URL, INFLUXDB_ORG, INFLUXDB_BUCKET, INFLUXDB_TOKEN, InfluxDbCloud2CACert); + +void setup() { + Serial.begin(115200); + + // Setup wifi + WiFi.mode(WIFI_STA); + wifiMulti.addAP(WIFI_SSID, WIFI_PASSWORD); + + Serial.print("Connecting to wifi"); + while (wifiMulti.run() != WL_CONNECTED) { + Serial.print("."); + delay(500); + } + Serial.println(); + + // Accurate time is necessary for certificate validation + // For the fastest time sync find NTP servers in your area: https://www.pool.ntp.org/zone/ + // Syncing progress and the time will be printed to Serial + timeSync(TZ_INFO, "pool.ntp.org", "time.nis.gov"); + + // Check server connection + if (client.validateConnection()) { + Serial.print("Connected to InfluxDB: "); + Serial.println(client.getServerUrl()); + } else { + Serial.print("InfluxDB connection failed: "); + Serial.println(client.getLastErrorMessage()); + } +} + +void loop() { + // Construct a Flux query + // Query will list RSSI for last 24 hours for each connected WiFi network of this device type + String query = "from(bucket: \"" INFLUXDB_BUCKET "\") |> range(start: -24h) |> filter(fn: (r) => r._measurement == \"wifi_status\" and r._field == \"rssi\""; + query += " and r.device == \"" DEVICE "\")"; + + Serial.println("==== List results ===="); + + // Print composed query + Serial.print("Querying with: "); + Serial.println(query); + + // Send query to the server and get result + FluxQueryResult result = client.query(query); + + // Iterate over rows. Even there is just one row, next() must be called at least once. + while (result.next()) { + // Check for new grouping key + if(result.hasTableChanged()) { + Serial.println("Table:"); + Serial.print(" "); + // Print all columns name + for(String &name: result.getColumnsName()) { + Serial.print(name); + Serial.print(","); + } + Serial.println(); + Serial.print(" "); + // Print all columns datatype + for(String &tp: result.getColumnsDatatype()) { + Serial.print(tp); + Serial.print(","); + } + Serial.println(); + } + Serial.print(" "); + // Print values of the row + for(FluxValue &val: result.getValues()) { + // Check wheter the value is null + if(!val.isNull()) { + // Use raw string, unconverted value + Serial.print(val.getRawValue()); + } else { + // Print null value substite + Serial.print(""); + } + Serial.print(","); + } + Serial.println(); + } + + // Check if there was an error + if(result.getError().length() > 0) { + Serial.print("Query result error: "); + Serial.println(result.getError()); + } + + // Close the result + result.close(); + + //Wait 10s + Serial.println("Wait 10s"); + delay(10000); +} + diff --git a/examples/SecureBatchWrite/SecureBatchWrite.ino b/examples/SecureBatchWrite/SecureBatchWrite.ino index 5ee746c..f245403 100644 --- a/examples/SecureBatchWrite/SecureBatchWrite.ino +++ b/examples/SecureBatchWrite/SecureBatchWrite.ino @@ -57,29 +57,6 @@ Point sensorStatus("wifi_status"); // Number for loops to sync time using NTP int iterations = 0; -void timeSync() { - // Synchronize UTC time with NTP servers - // Accurate time is necessary for certificate validaton and writing in batches - configTime(0, 0, "pool.ntp.org", "time.nis.gov"); - // Set timezone - setenv("TZ", TZ_INFO, 1); - - // Wait till time is synced - Serial.print("Syncing time"); - int i = 0; - while (time(nullptr) < 1000000000ul && i < 100) { - Serial.print("."); - delay(100); - i++; - } - Serial.println(); - - // Show time - time_t tnow = time(nullptr); - Serial.print("Synchronized time: "); - Serial.println(String(ctime(&tnow))); -} - void setup() { Serial.begin(115200); @@ -98,8 +75,10 @@ void setup() { sensorStatus.addTag("device", DEVICE); sensorStatus.addTag("SSID", WiFi.SSID()); - // Sync time for certificate validation - timeSync(); + // Accurate time is necessary for certificate validation and writing in batches + // For the fastest time sync find NTP servers in your area: https://www.pool.ntp.org/zone/ + // Syncing progress and the time will be printed to Serial. + timeSync(TZ_INFO, "pool.ntp.org", "time.nis.gov"); // Check server connection if (client.validateConnection()) { diff --git a/examples/SecureWrite/SecureWrite.ino b/examples/SecureWrite/SecureWrite.ino index 376a592..53d7f09 100644 --- a/examples/SecureWrite/SecureWrite.ino +++ b/examples/SecureWrite/SecureWrite.ino @@ -51,29 +51,6 @@ InfluxDBClient client(INFLUXDB_URL, INFLUXDB_ORG, INFLUXDB_BUCKET, INFLUXDB_TOKE // Data point Point sensor("wifi_status"); -void timeSync() { - // Synchronize UTC time with NTP servers - // Accurate time is necessary for certificate validaton and writing in batches - configTime(0, 0, "pool.ntp.org", "time.nis.gov"); - // Set timezone - setenv("TZ", TZ_INFO, 1); - - // Wait till time is synced - Serial.print("Syncing time"); - int i = 0; - while (time(nullptr) < 1000000000ul && i < 100) { - Serial.print("."); - delay(100); - i++; - } - Serial.println(); - - // Show time - time_t tnow = time(nullptr); - Serial.print("Synchronized time: "); - Serial.println(String(ctime(&tnow))); -} - void setup() { Serial.begin(115200); @@ -92,8 +69,10 @@ void setup() { sensor.addTag("device", DEVICE); sensor.addTag("SSID", WiFi.SSID()); - // Sync time for certificate validation - timeSync(); + // Accurate time is necessary for certificate validation and writing in batches + // For the fastest time sync find NTP servers in your area: https://www.pool.ntp.org/zone/ + // Syncing progress and the time will be printed to Serial. + timeSync(TZ_INFO, "pool.ntp.org", "time.nis.gov"); // Check server connection if (client.validateConnection()) { diff --git a/keywords.txt b/keywords.txt index f8e413c..b1c7f2a 100644 --- a/keywords.txt +++ b/keywords.txt @@ -6,6 +6,9 @@ Point KEYWORD1 InfluxDBClient KEYWORD1 InfluxData KEYWORD1 Influxdb KEYWORD1 +FluxValue KEYWORD1 +FluxQueryResult KEYWORD1 +FluxDateTime KEYWORD1 # Methods and Functions (KEYWORD2) addTag KEYWORD2 diff --git a/library.properties b/library.properties index fddb0d7..b903dca 100644 --- a/library.properties +++ b/library.properties @@ -1,10 +1,10 @@ name=ESP8266 Influxdb version=3.1.3 -author=Tobias Schürg, Influxdata +author=Tobias Schürg, InfluxData maintainer=Tobias Schürg, InfluxData sentence=InfluxDB Client for Arduino. url=https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino -paragraph=This library allows writing data to InfluxDB server or InfluxDB Cloud. Supports authentication, secure communication over TLS, batching and retrying. +paragraph=This library allows writing and reading data from InfluxDB server or InfluxDB Cloud. Supports authentication, secure communication over TLS, batching and retrying. category=Data Storage architectures=* includes=InfluxDbClient.h diff --git a/src/InfluxDbClient.cpp b/src/InfluxDbClient.cpp index e4a79d5..0317667 100644 --- a/src/InfluxDbClient.cpp +++ b/src/InfluxDbClient.cpp @@ -24,8 +24,8 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -#include "InfluxDbClient.h" #include +#include "InfluxDbClient.h" #define STRHELPER(x) #x #define STR(x) STRHELPER(x) // stringifier @@ -43,15 +43,12 @@ static const char UserAgent[] PROGMEM = "influxdb-client-arduino/" INFLUXDB_CLIE // Uncomment bellow in case of a problem and rebuild sketch //#define INFLUXDB_CLIENT_DEBUG -#ifdef INFLUXDB_CLIENT_DEBUG -# define INFLUXDB_CLIENT_DEBUG(fmt, ...) Serial.printf_P( (PGM_P)PSTR(fmt), ## __VA_ARGS__ ) -#else -# define INFLUXDB_CLIENT_DEBUG(fmt, ...) -#endif +#include "util/debug.h" static const char UnitialisedMessage[] PROGMEM = "Unconfigured instance"; -// This cannot be put to PROGMEM due to the way how it used +// This cannot be put to PROGMEM due to the way how it is used static const char RetryAfter[] = "Retry-After"; +static const char TransferEnconding[] = "Transfer-Encoding"; static String escapeKey(String key); static String escapeValue(const char *value); @@ -195,6 +192,12 @@ void InfluxDBClient::setConnectionParamsV1(const char *serverUrl, const char *db } bool InfluxDBClient::init() { + INFLUXDB_CLIENT_DEBUG(F("Init\n")); + INFLUXDB_CLIENT_DEBUG(F(" Server url: %s\n"), _serverUrl.c_str()); + INFLUXDB_CLIENT_DEBUG(F(" Org: %s\n"), _org.c_str()); + INFLUXDB_CLIENT_DEBUG(F(" Bucket: %s\n"), _bucket.c_str()); + INFLUXDB_CLIENT_DEBUG(F(" Token: %s\n"), _authToken.c_str()); + INFLUXDB_CLIENT_DEBUG(F(" DB version: %d\n"), _dbVersion); if(_serverUrl.length() == 0 || (_dbVersion == 2 && (_org.length() == 0 || _bucket.length() == 0 || _authToken.length() == 0))) { INFLUXDB_CLIENT_DEBUG("[E] Invalid parameters\n"); return false; @@ -262,17 +265,39 @@ void InfluxDBClient::clean() { } void InfluxDBClient::setUrls() { + INFLUXDB_CLIENT_DEBUG(F("setUrls\n")); if(_dbVersion == 2) { - _writeUrl = _serverUrl + "/api/v2/write?org=" + _org + "&bucket=" + _bucket; + _writeUrl = _serverUrl; + _writeUrl += "/api/v2/write?org="; + _writeUrl += _org ; + _writeUrl += "&bucket="; + _writeUrl += _bucket; + INFLUXDB_CLIENT_DEBUG(F(" writeUrl: %s\n"), _writeUrl.c_str()); + _queryUrl = _serverUrl; + _queryUrl += "/api/v2/query?org="; + _queryUrl += _org; + INFLUXDB_CLIENT_DEBUG(F(" queryUrl: %s\n"), _queryUrl.c_str()); } else { - _writeUrl = _serverUrl + "/write?db=" + _bucket; + _writeUrl = _serverUrl; + _writeUrl += "/write?db="; + _writeUrl += _bucket; + _queryUrl = _serverUrl; + _queryUrl += "/api/v2/query"; if(_user.length() > 0 && _password.length() > 0) { - String auth = "&u=" + _user + "&p=" + _password; - _writeUrl += auth; + String auth = "&u="; + auth += _user; + auth += "&p="; + auth += _password; + _writeUrl += auth; + _queryUrl += auth; } + INFLUXDB_CLIENT_DEBUG(F(" writeUrl: %s\n"), _writeUrl.c_str()); + INFLUXDB_CLIENT_DEBUG(F(" queryUrl: %s\n"), _queryUrl.c_str()); } if(_writePrecision != WritePrecision::NoTime) { - _writeUrl += String("&precision=") + precisionToString(_writePrecision, _dbVersion); + _writeUrl += "&precision="; + _writeUrl += precisionToString(_writePrecision, _dbVersion); + INFLUXDB_CLIENT_DEBUG(F(" writeUrl: %s\n"), _writeUrl.c_str()); } } @@ -500,8 +525,8 @@ void InfluxDBClient::preRequest() { if(_authToken.length() > 0) { _httpClient.addHeader(F("Authorization"), "Token " + _authToken); } - const char * headerKeys[] = {RetryAfter} ; - _httpClient.collectHeaders(headerKeys, 1); + const char * headerKeys[] = {RetryAfter, TransferEnconding} ; + _httpClient.collectHeaders(headerKeys, 2); } int InfluxDBClient::postData(const char *data) { @@ -532,6 +557,64 @@ int InfluxDBClient::postData(const char *data) { return _lastStatusCode; } + + +static const char QueryDialect[] PROGMEM = "\ +\"dialect\": {\ +\"annotations\": [\ +\"datatype\"\ +],\ +\"dateTimeFormat\": \"RFC3339\",\ +\"header\": true,\ +\"delimiter\": \",\",\ +\"commentPrefix\": \"#\"\ +}}"; + +FluxQueryResult InfluxDBClient::query(String fluxQuery) { + if(_lastRetryAfter > 0 && (millis()-_lastRequestTime)/1000 < _lastRetryAfter) { + // retry after period didn't run out yet + return FluxQueryResult("Too early"); + } + if(!_wifiClient && !init()) { + _lastStatusCode = 0; + _lastErrorResponse = FPSTR(UnitialisedMessage); + return FluxQueryResult(_lastErrorResponse); + } + INFLUXDB_CLIENT_DEBUG("[D] Query to %s\n", _queryUrl.c_str()); + if(!_httpClient.begin(*_wifiClient, _queryUrl)) { + INFLUXDB_CLIENT_DEBUG("[E] begin failed\n"); + return FluxQueryResult("");; + } + _httpClient.addHeader(F("Content-Type"), F("application/json")); + + preRequest(); + + INFLUXDB_CLIENT_DEBUG("[D] JSON query:\n%s\n", fluxQuery.c_str()); + + String body = F("{\"type\":\"flux\",\"query\":\""); + body += escapeJSONString(fluxQuery) + "\","; + body += FPSTR(QueryDialect); + + _lastStatusCode = _httpClient.POST(body); + + postRequest(200); + if(_lastStatusCode == 200) { + bool chunked = false; + if(_httpClient.hasHeader(TransferEnconding)) { + String header = _httpClient.header(TransferEnconding); + chunked = header.equalsIgnoreCase("chunked"); + } + INFLUXDB_CLIENT_DEBUG("[D] chunked: %s\n", chunked?"true":"false"); + HttpStreamScanner *scanner = new HttpStreamScanner(&_httpClient, chunked); + CsvReader *reader = new CsvReader(scanner); + + return FluxQueryResult(reader); + } else { + _httpClient.end(); + return FluxQueryResult(_lastErrorResponse); + } +} + void InfluxDBClient::postRequest(int expectedStatusCode) { _lastRequestTime = millis(); INFLUXDB_CLIENT_DEBUG("[D] HTTP status code - %d\n", _lastStatusCode); @@ -618,3 +701,40 @@ static String escapeTagValue(const char *value) { } return ret; } + +static String escapeJSONString(String &value) { + String ret; + int d = 0; + int i,from = 0; + while((i = value.indexOf('"',from)) > -1) { + d++; + if(i == value.length()-1) { + break; + } + from = i+1; + } + ret.reserve(value.length()+d); //most probably we will escape just double quotes + for (char c: value) + { + switch (c) + { + case '"': ret += "\\\""; break; + case '\\': ret += "\\\\"; break; + case '\b': ret += "\\b"; break; + case '\f': ret += "\\f"; break; + case '\n': ret += "\\n"; break; + case '\r': ret += "\\r"; break; + case '\t': ret += "\\t"; break; + default: + if ('\x00' <= c && c <= '\x1f') { + ret += "\\u"; + char buf[3 + 8 * sizeof(unsigned int)]; + sprintf(buf, "\\u%04u", c); + ret += buf; + } else { + ret += c; + } + } + } + return ret; +} diff --git a/src/InfluxDbClient.h b/src/InfluxDbClient.h index 78ee011..3201335 100644 --- a/src/InfluxDbClient.h +++ b/src/InfluxDbClient.h @@ -29,7 +29,9 @@ #define INFLUXDB_CLIENT_VERSION "3.1.3" -#include "Arduino.h" +#include +#include "query/FluxParser.h" +#include "util/helpers.h" #if defined(ESP8266) # include @@ -165,6 +167,10 @@ class InfluxDBClient { // Writes record represented by Point to buffer // Returns true if successful, false in case of any error bool writePoint(Point& point); + // Sends Flux query and returns FluxQueryResult object for subsequentialy reading flux query response. + // Use FluxQueryResult::next() method to iterate over lines of the query result. + // Always call of FluxQueryResult::close() when reading is finished. Check FluxQueryResult doc for more info. + FluxQueryResult query(String fluxQuery); // Writes all points in buffer, with respect to the batch size, and in case of success clears the buffer. // Returns true if successful, false in case of any error bool flushBuffer(); @@ -205,6 +211,8 @@ class InfluxDBClient { String _password; // Cached full write url String _writeUrl; + // Cached full query url + String _queryUrl; // Points timestamp precision. WritePrecision _writePrecision = WritePrecision::NoTime; // Number of points that will be written to the databases at once. diff --git a/src/query/CsvReader.cpp b/src/query/CsvReader.cpp new file mode 100644 index 0000000..0de020d --- /dev/null +++ b/src/query/CsvReader.cpp @@ -0,0 +1,111 @@ +/** + * + * CsvReader.cpp: Simple Csv parser for comma separated values, with double quotes suppport + * + * MIT License + * + * Copyright (c) 2020 InfluxData + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. +*/ +#include "CsvReader.h" +// Uncomment bellow in case of a problem and rebuild sketch +#define INFLUXDB_CLIENT_DEBUG +#include "util/debug.h" + +CsvReader::CsvReader(HttpStreamScanner *scanner) { + _scanner = scanner; +} + +CsvReader::~CsvReader() { + delete _scanner; +} + +std::vector CsvReader::getRow() { + return _row; +}; + +void CsvReader::close() { + clearRow(); + _scanner->close(); +} + +void CsvReader::clearRow() { + std::for_each(_row.begin(), _row.end(), [](String &value){ value = (const char *)nullptr; }); + _row.clear(); +} + +enum class CsvParsingState { + UnquotedField, + QuotedField, + QuotedQuote +}; + +bool CsvReader::next() { + clearRow(); + bool status = _scanner->next(); + if(!status) { + _error = _scanner->getError(); + return false; + } + String line = _scanner->getLine(); + CsvParsingState state = CsvParsingState::UnquotedField; + std::vector fields {""}; + size_t i = 0; // index of the current field + for (char c : line) { + switch (state) { + case CsvParsingState::UnquotedField: + switch (c) { + case ',': // end of field + fields.push_back(""); i++; + break; + case '"': state = CsvParsingState::QuotedField; + break; + default: fields[i] += c; + break; + } + break; + case CsvParsingState::QuotedField: + switch (c) { + case '"': state = CsvParsingState::QuotedQuote; + break; + default: fields[i] += c; + break; + } + break; + case CsvParsingState::QuotedQuote: + switch (c) { + case ',': // , after closing quote + fields.push_back(""); i++; + state = CsvParsingState::UnquotedField; + break; + case '"': // "" -> " + fields[i] += '"'; + state = CsvParsingState::QuotedField; + break; + default: // end of quote + state = CsvParsingState::UnquotedField; + break; + } + break; + } + } + _row = fields; + return true; +} \ No newline at end of file diff --git a/src/query/CsvReader.h b/src/query/CsvReader.h new file mode 100644 index 0000000..9dc3057 --- /dev/null +++ b/src/query/CsvReader.h @@ -0,0 +1,51 @@ +/** + * + * CsvReader.h: Simple Csv parser for comma separated values, with double quotes suppport + * + * MIT License + * + * Copyright (c) 2020 InfluxData + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. +*/ +#ifndef _CSV_READER_ +#define _CSV_READER_ + +#include "HttpStreamScanner.h" +#include + +/** + * CsvReader parses csv line to token by ',' (comma) character. + * It suppports escaped quotes, excaped comma + **/ +class CsvReader { +public: + CsvReader(HttpStreamScanner *scanner); + ~CsvReader(); + bool next(); + void close(); + std::vector getRow(); + int getError() const { return _error; }; +private: + void clearRow(); + HttpStreamScanner *_scanner = nullptr; + std::vector _row; + int _error = 0; +}; +#endif //_CSV_READER_ \ No newline at end of file diff --git a/src/query/FluxParser.cpp b/src/query/FluxParser.cpp new file mode 100644 index 0000000..23ef108 --- /dev/null +++ b/src/query/FluxParser.cpp @@ -0,0 +1,274 @@ +/** + * + * FluxParser.cpp: InfluxDB flux query result parser + * + * MIT License + * + * Copyright (c) 2018-2020 InfluxData + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. +*/ + +#include "FluxParser.h" +// Uncomment bellow in case of a problem and rebuild sketch +//#define INFLUXDB_CLIENT_DEBUG +#include "util/debug.h" + +FluxQueryResult::FluxQueryResult(CsvReader *reader) { + _data = std::make_shared(reader); +} + +FluxQueryResult::FluxQueryResult(String error):FluxQueryResult((CsvReader *)nullptr) { + _data->_error = error; +} + +FluxQueryResult::FluxQueryResult(const FluxQueryResult &other) { + _data = other._data; +} +FluxQueryResult &FluxQueryResult::operator=(const FluxQueryResult &other) { + if(this != &other) { + _data = other._data; + } + return *this; +} + +FluxQueryResult::~FluxQueryResult() { +} + +int FluxQueryResult::getColumnIndex(String columnName) { + int i = -1; + std::vector::iterator it = find(_data->_columnNames.begin(), _data->_columnNames.end(), columnName); + if (it != _data->_columnNames.end()) { + i = distance(_data->_columnNames.begin(), it); + } + return i; +} + +FluxValue FluxQueryResult::getValueByIndex(int index) { + FluxValue ret; + if(index >= 0 && index < _data->_columnValues.size()) { + ret = _data->_columnValues[index]; + } + return ret; +} + +FluxValue FluxQueryResult::getValueByName(String columnName) { + FluxValue ret; + int i = getColumnIndex(columnName); + if(i > -1) { + ret = getValueByIndex(i); + } + return ret; +} + +void FluxQueryResult::close() { + clearValues(); + clearColumns(); + if(_data->_reader) { + _data->_reader->close(); + } +} + +void FluxQueryResult::clearValues() { + std::for_each(_data->_columnValues.begin(), _data->_columnValues.end(), [](FluxValue &value){ value = nullptr; }); + _data->_columnValues.clear(); +} + +void FluxQueryResult::clearColumns() { + std::for_each(_data->_columnNames.begin(), _data->_columnNames.end(), [](String &value){ value = (const char *)nullptr; }); + _data->_columnNames.clear(); + + std::for_each(_data->_columnDatatypes.begin(), _data->_columnDatatypes.end(), [](String &value){ value = (const char *)nullptr; }); + _data->_columnDatatypes.clear(); +} + +FluxQueryResult::Data::Data(CsvReader *reader):_reader(reader) {} + +FluxQueryResult::Data::~Data() { + delete _reader; +} + +enum ParsingState { + ParsingStateNormal = 0, + ParsingStateNameRow, + ParsingStateError +}; + +bool FluxQueryResult::next() { + if(!_data->_reader) { + return false; + } + ParsingState parsingState = ParsingStateNormal; + _data->_tableChanged = false; + clearValues(); + _data->_error = ""; +readRow: + bool stat = _data->_reader->next(); + if(!stat) { + if(_data->_reader->getError()< 0) { + _data->_error = HTTPClient::errorToString(_data->_reader->getError()); + INFLUXDB_CLIENT_DEBUG(F("Error '%s'\n"), _data->_error.c_str()); + } + return false; + } + std::vector vals = _data->_reader->getRow(); + INFLUXDB_CLIENT_DEBUG(F("[D] FluxQueryResult: vals.size %d\n"), vals.size()); + if(vals.size() < 2) { + goto readRow; + } + if(vals[0] == "") { + if (parsingState == ParsingStateError) { + String message ; + if (vals.size() > 1 && vals[1].length() > 0) { + message = vals[1]; + } else { + message = F("Unknown query error"); + } + String reference = ""; + if (vals.size() > 2 && vals[2].length() > 0) { + reference = "," + vals[2]; + } + _data->_error = message + reference; + INFLUXDB_CLIENT_DEBUG(F("Error '%s'\n"), _data->_error.c_str()); + return false; + } else if (parsingState == ParsingStateNameRow) { + if (vals[1] == "error") { + parsingState = ParsingStateError; + } else { + if (vals.size()-1 != _data->_columnDatatypes.size()) { + _data->_error = String(F("Parsing error, header has different number of columns than table: ")) + String(vals.size()-1) + " vs " + String(_data->_columnDatatypes.size()); + INFLUXDB_CLIENT_DEBUG(F("Error '%s'\n"), _data->_error.c_str()); + return false; + } else { + for(int i=1;i < vals.size(); i++) { + _data->_columnNames.push_back(vals[i]); + } + } + parsingState = ParsingStateNormal; + } + goto readRow; + } + if(_data->_columnDatatypes.size() == 0) { + _data->_error = F("Parsing error, datatype annotation not found"); + INFLUXDB_CLIENT_DEBUG(F("Error '%s'\n"), _data->_error.c_str()); + return false; + } + if (vals.size()-1 != _data->_columnNames.size()) { + _data->_error = String(F("Parsing error, row has different number of columns than table: ")) + String(vals.size()-1) + " vs " + String(_data->_columnNames.size()); + INFLUXDB_CLIENT_DEBUG(F("Error '%s'\n"), _data->_error.c_str()); + return false; + } + for(int i=1;i < vals.size(); i++) { + FluxBase *v = nullptr; + if(vals[i].length() > 0) { + v = convertValue(vals[i], _data->_columnDatatypes[i-1]); + if(!v) { + _data->_error = String(F("Unsupported datatype: ")) + _data->_columnDatatypes[i-1]; + INFLUXDB_CLIENT_DEBUG(F("Error '%s'\n"), _data->_error.c_str()); + return false; + } + } + FluxValue val(v); + _data->_columnValues.push_back(val); + } + } else if(vals[0] == "#datatype") { + _data->_tablePosition++; + clearColumns(); + _data->_tableChanged = true; + for(int i=1;i < vals.size(); i++) { + _data->_columnDatatypes.push_back(vals[i]); + } + parsingState = ParsingStateNameRow; + goto readRow; + } + return true; +} + +FluxDateTime *FluxQueryResult::convertRfc3339(String value, const char *type) { + FluxDateTime *ft = nullptr; + tm t = {0,0,0,0,0,0,0,0,0}; + // has the time part + int zet = value.indexOf('Z'); + unsigned long fracts = 0; + if(value.indexOf('T') > 0 && zet > 0) { //Full datetime string - 2020-05-22T11:25:22.037735433Z + int f = sscanf(value.c_str(),"%d-%d-%dT%d:%d:%d", &t.tm_year,&t.tm_mon,&t.tm_mday, &t.tm_hour,&t.tm_min,&t.tm_sec); + if(f != 6) { + return nullptr; + } + t.tm_year -= 1900; //adjust to years after 1900 + t.tm_mon -= 1; //adjust to range 0-11 + int dot = value.indexOf('.'); + + if(dot > 0) { + int tail = zet; + int len = zet-dot-1; + if (len > 6) { + tail = dot + 7; + len = 6; + } + String secParts = value.substring(dot+1, tail); + fracts = strtoul((const char *) secParts.c_str(), NULL, 10); + if(len < 6) { + fracts *= 10^(6-len); + } + } + } else { + int f = sscanf(value.c_str(),"%d-%d-%d", &t.tm_year,&t.tm_mon,&t.tm_mday); + if(f != 3) { + return nullptr; + } + t.tm_year -= 1900; //adjust to years after 1900 + t.tm_mon -= 1; //adjust to range 0-11 + } + return new FluxDateTime(value, type, t, fracts); +} + +FluxBase *FluxQueryResult::convertValue(String value, String dataType) { + FluxBase *ret = nullptr; + if(dataType.equals(FluxDatatypeDatetimeRFC3339) || dataType.equals(FluxDatatypeDatetimeRFC3339Nano)) { + const char *type = FluxDatatypeDatetimeRFC3339; + if(dataType.equals(FluxDatatypeDatetimeRFC3339Nano)) { + type = FluxDatatypeDatetimeRFC3339Nano; + } + ret = convertRfc3339(value, type); + if (!ret) { + _data->_error = String(F("Invalid value for '")) + dataType + F("': ") + value; + } + } else if(dataType.equals(FluxDatatypeDouble)) { + double val = strtod((const char *) value.c_str(), NULL); + ret = new FluxDouble(value, val); + } else if(dataType.equals(FluxDatatypeBool)) { + bool val = value.equalsIgnoreCase("true"); + ret = new FluxBool(value, val); + } else if(dataType.equals(FluxDatatypeLong)) { + long l = strtol((const char *) value.c_str(), NULL, 10); + ret = new FluxLong(value, l); + } else if(dataType.equals(FluxDatatypeUnsignedLong)) { + unsigned long ul = strtoul((const char *) value.c_str(), NULL, 10); + ret = new FluxUnsignedLong(value, ul); + } else if(dataType.equals(FluxBinaryDataTypeBase64)) { + ret = new FluxString(value, FluxBinaryDataTypeBase64); + } else if(dataType.equals(FluxDatatypeDuration)) { + ret = new FluxString(value, FluxDatatypeDuration); + } else if(dataType.equals(FluxDatatypeString)) { + ret = new FluxString(value, FluxDatatypeString); + } + return ret; +} + diff --git a/src/query/FluxParser.h b/src/query/FluxParser.h new file mode 100644 index 0000000..84e1326 --- /dev/null +++ b/src/query/FluxParser.h @@ -0,0 +1,109 @@ +/** + * + * FLuxParser.h: InfluxDB flux query result parser + * + * MIT License + * + * Copyright (c) 2020 InfluxData + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. +*/ +#ifndef _FLUX_PARSER_H_ +#define _FLUX_PARSER_H_ + +#include +#include "CsvReader.h" +#include "FluxTypes.h" + + +/** + * FluxQueryResult represents result from InfluxDB flux query. + * It parses stream from server, line by line, so it allows to read a huge responses. + * + * Browsing thought the result is done by repeatedly calling the next() method, until it returns false. + * Unsuccesful reading is distinqushed by non empty value from getError(). + * + * As a flux query result can contain several tables differing by grouping key, use hasTableChanged() to + * know when there is a new table. + * + * Single values are returned using getValueByIndex() or getValueByName() methods. + * All row values are retreived by getValues(). + * + * Always call close() at the of reading. + * + * FluxQueryResult supports passing by value. + */ +class FluxQueryResult { +public: + // Constructor for reading result + FluxQueryResult(CsvReader *reader); + // Constructor for error result + FluxQueryResult(String error); + // Copy constructor + FluxQueryResult(const FluxQueryResult &other); + // Assignment operator + FluxQueryResult &operator=(const FluxQueryResult &other); + // Advances to next values row in the result set. + // Returns true on successful reading new row, false means end of the result set + // or an error. Call getError() and check non empty value + bool next(); + // Returns index of the column, or -1 if not found + int getColumnIndex(String columnName); + // Returns a converted value by index, or nullptr in case of missing value or wrong index + FluxValue getValueByIndex(int index); + // Returns a result value by column name, or nullptr in case of missing value or wrong column name + FluxValue getValueByName(String columnName); + // Returns flux datatypes of all columns + std::vector getColumnsDatatype() { return _data->_columnDatatypes; } + // Returns names of all columns + std::vector getColumnsName() { return _data->_columnNames; } + // Returns all values from current row + std::vector getValues() { return _data->_columnValues; } + // Returns true if new table was encountered + bool hasTableChanged() const { return _data->_tableChanged; } + // Returns current table position in the results set + int getTablePosition() const { return _data->_tablePosition; } + // Returns an error found during parsing if any, othewise empty string + String getError() { return _data->_error; } + // Releases all resources and closes server reponse. It must be always called at end of reading. + void close(); + // Descructor + ~FluxQueryResult(); +protected: + FluxBase *convertValue(String value, String dataType); + static FluxDateTime *convertRfc3339(String value, const char *type); + void clearValues(); + void clearColumns(); +private: + class Data { + public: + Data(CsvReader *reader); + ~Data(); + CsvReader *_reader; + int _tablePosition = -1; + bool _tableChanged = false; + std::vector _columnDatatypes; + std::vector _columnNames; + std::vector _columnValues; + String _error; + }; + std::shared_ptr _data; +}; + +#endif //#_FLUX_PARSER_H_ diff --git a/src/query/FluxTypes.cpp b/src/query/FluxTypes.cpp new file mode 100644 index 0000000..9b18c03 --- /dev/null +++ b/src/query/FluxTypes.cpp @@ -0,0 +1,181 @@ +/** + * + * FluxTypes.cpp: InfluxDB flux query types support + * + * MIT License + * + * Copyright (c) 2018-2020 InfluxData + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. +*/ + +#include "FluxTypes.h" + +const char *FluxDatatypeString = "string"; +const char *FluxDatatypeDouble = "double"; +const char *FluxDatatypeBool = "bool"; +const char *FluxDatatypeLong = "long"; +const char *FluxDatatypeUnsignedLong = "unsignedLong"; +const char *FluxDatatypeDuration = "duration"; +const char *FluxBinaryDataTypeBase64 = "base64Binary"; +const char *FluxDatatypeDatetimeRFC3339 = "dateTime:RFC3339"; +const char *FluxDatatypeDatetimeRFC3339Nano = "dateTime:RFC3339Nano"; + +FluxBase::FluxBase(String rawValue) { + _rawValue = rawValue; +} + +FluxBase::~FluxBase() { +} + + +FluxLong::FluxLong(String rawValue, long value):FluxBase(rawValue),value(value) { + +} + +const char *FluxLong::getType() { + return FluxDatatypeLong; +} + +FluxUnsignedLong::FluxUnsignedLong(String rawValue, unsigned long value):FluxBase(rawValue),value(value) { +} + +const char *FluxUnsignedLong::getType() { + return FluxDatatypeUnsignedLong; +} + +FluxDouble::FluxDouble(String rawValue, double value):FluxBase(rawValue),value(value) { + +} + +const char *FluxDouble::getType() { + return FluxDatatypeDouble; +} + +FluxBool::FluxBool(String rawValue, bool value):FluxBase(rawValue),value(value) { +} + +const char *FluxBool::getType() { + return FluxDatatypeBool; +} + + +FluxDateTime::FluxDateTime(String rawValue, const char *type, struct tm value, unsigned long microseconds):FluxBase(rawValue),_type(type),value(value), microseconds(microseconds) { + +} + +const char *FluxDateTime::getType() { + return _type; +} + +String FluxDateTime::format(String formatString) { + int len = formatString.length() + 20; //+20 for safety + char *buff = new char[len]; + strftime(buff,len, formatString.c_str(),&value); + String str = buff; + delete [] buff; + return str; +} + +FluxString::FluxString(String rawValue, const char *type):FluxBase(rawValue),_type(type),value(_rawValue) { + +} + +const char *FluxString::getType() { + return _type; +} + + +FluxValue::FluxValue() {} + +FluxValue::FluxValue(FluxBase *fluxValue):_data(fluxValue) { + +} + +FluxValue::FluxValue(const FluxValue &other) { + _data = other._data; +} + +FluxValue& FluxValue::operator=(const FluxValue& other) { + if(this != &other) { + _data = other._data; + } + return *this; +} + +// Type accessor. If value is different type zero value for given time is returned. +String FluxValue::getString() { + if(_data && (_data->getType() == FluxDatatypeString ||_data->getType() == FluxDatatypeDuration || _data->getType() == FluxBinaryDataTypeBase64)) { + FluxString *s = (FluxString *)_data.get(); + return s->value; + } + return ""; +} + +long FluxValue::getLong() { + if(_data && _data->getType() == FluxDatatypeLong) { + FluxLong *l = (FluxLong *)_data.get(); + return l->value; + } + return 0; +} + +unsigned long FluxValue::getUnsignedLong() { + if(_data && _data->getType() == FluxDatatypeUnsignedLong) { + FluxUnsignedLong *l = (FluxUnsignedLong *)_data.get(); + return l->value; + } + return 0; + +} +FluxDateTime FluxValue::getDateTime() { + if(_data && (_data->getType() == FluxDatatypeDatetimeRFC3339 ||_data->getType() == FluxDatatypeDatetimeRFC3339Nano)) { + FluxDateTime *d = (FluxDateTime *)_data.get(); + return *d; + } + return FluxDateTime("",FluxDatatypeDatetimeRFC3339, {0,0,0,0,0,0,0,0,0}, 0 ); +} + +bool FluxValue::getBool() { + if(_data && _data->getType() == FluxDatatypeBool) { + FluxBool *b = (FluxBool *)_data.get(); + return b->value; + } + return false; +} + +double FluxValue::getDouble() { + if(_data && _data->getType() == FluxDatatypeDouble) { + FluxDouble *d = (FluxDouble *)_data.get(); + return d->value; + } + return 0.0; +} + +// returns string representation of non-string values +String FluxValue::getRawValue() { + if(_data) { + return _data->getRawValue(); + } + return ""; +} + +bool FluxValue::isNull() { + return _data == nullptr; +} \ No newline at end of file diff --git a/src/query/FluxTypes.h b/src/query/FluxTypes.h new file mode 100644 index 0000000..d2d254c --- /dev/null +++ b/src/query/FluxTypes.h @@ -0,0 +1,167 @@ +/** + * + * FLuxTypes.h: InfluxDB flux types representation + * + * MIT License + * + * Copyright (c) 2020 InfluxData + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. +*/ +#ifndef _FLUX_TYPES_H_ +#define _FLUX_TYPES_H_ + +#include +#include + +/** Supported flux types: + * - long - converts to long + * - unsignedLong - converts to unsigned long + * - double - converts to double + * - bool - converts to bool + * - dateTime:RFC3339 - converts to FluxDataTime + * - dateTime:RFC3339Nano - converts to FluxDataTime + * other types defaults to String + */ + +extern const char *FluxDatatypeString; +extern const char *FluxDatatypeDouble; +extern const char *FluxDatatypeBool; +extern const char *FluxDatatypeLong; +extern const char *FluxDatatypeUnsignedLong; +extern const char *FluxDatatypeDuration; +extern const char *FluxBinaryDataTypeBase64; +extern const char *FluxDatatypeDatetimeRFC3339; +extern const char *FluxDatatypeDatetimeRFC3339Nano; + +// Base type for all specific flux types +class FluxBase { +protected: + String _rawValue; +public: + FluxBase(String rawValue); + virtual ~FluxBase(); + String getRawValue() const { return _rawValue; } + virtual const char *getType() = 0; +}; + +// Represents flux long +class FluxLong : public FluxBase { +public: + FluxLong(String rawValue, long value); + long value; + virtual const char *getType() override; +}; + +// Represents flux unsignedLong +class FluxUnsignedLong : public FluxBase { +public: + FluxUnsignedLong(String rawValue, unsigned long value); + unsigned long value; + virtual const char *getType() override; +}; + +// Represents flux double +class FluxDouble : public FluxBase { +public: + FluxDouble(String rawValue, double value); + double value; + virtual const char *getType() override; +}; + +// Represents flux bool +class FluxBool : public FluxBase { +public: + FluxBool(String rawValue, bool value); + bool value; + virtual const char *getType() override; +}; + +// Represents flux dateTime:RFC3339 and dateTime:RFC3339Nano +// Date and time are stored in classic struct tm. +// Fraction of second is stored in microseconds +// There are several classic functions for using struct tm: http://www.cplusplus.com/reference/ctime/ +class FluxDateTime : public FluxBase { +protected: + const char *_type; +public: + FluxDateTime(String rawValue, const char *type, struct tm value, unsigned long microseconds); + // Struct tm for date and time + struct tm value; + // microseconds part + unsigned long microseconds; + // Formats the value part to string according to the given format. Microseconds are skipped. + // Format string must be compatible with the http://www.cplusplus.com/reference/ctime/strftime/ + String format(String formatString); + virtual const char *getType() override; +}; + +// Represents flux string, duration, base64binary +class FluxString : public FluxBase { +protected: + const char *_type; +public: + FluxString(String rawValue, const char *type); + String value; + virtual const char *getType() override; +}; + +/** + * FluxValue wraps a value from a flux query result column. + * It provides getter methods for supported flux types: + * * getString() - string, base64binary or duration + * * getLong() - long + * * getUnsignedLong() - unsignedLong + * * getDateTime() - dateTime:RFC3339 or dateTime:RFC3339Nano + * * getBool() - bool + * * getDouble() - double + * + * Calling improper type getter will result in zero (empty) value. + * Check for null value usig isNull(). + * Use getRawValue() for getting original string form. + * + **/ + +class FluxValue { +public: + FluxValue(); + FluxValue(FluxBase *value); + FluxValue(const FluxValue &other); + FluxValue& operator=(const FluxValue& other); + // Check if value represent null - not present - value. + bool isNull(); + // Returns a value of string, base64binary or duration type column, or empty string if column is a different type. + String getString(); + // Returns a value of long type column, or zero if column is a different type. + long getLong(); + // Returns a value of unsigned long type column, or zero if column is a different type. + unsigned long getUnsignedLong(); + // Returns a value of dateTime:RFC3339 or dateTime:RFC3339Nano, or zeroed FluxDateTime instance if column is a different type. + FluxDateTime getDateTime(); + // Returns a value of bool type column, or false if column is a different type. + bool getBool(); + // Returns a value of double type column, or 0.0 if column is a different type. + double getDouble(); + // Returns a value in the original string form, as presented in the response. + String getRawValue(); +private: + std::shared_ptr _data; +}; + +#endif //_FLUX_TYPES_H_ \ No newline at end of file diff --git a/src/query/HttpStreamScanner.cpp b/src/query/HttpStreamScanner.cpp new file mode 100644 index 0000000..40705de --- /dev/null +++ b/src/query/HttpStreamScanner.cpp @@ -0,0 +1,103 @@ +/** + * + * HttpStreamScanner.cpp: Scannes HttpClient stream for lines. Supports chunking. + * + * MIT License + * + * Copyright (c) 2020 InfluxData + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. +*/ +#include "HttpStreamScanner.h" + +// Uncomment bellow in case of a problem and rebuild sketch +//#define INFLUXDB_CLIENT_DEBUG +#include "util/debug.h" + +HttpStreamScanner::HttpStreamScanner(HTTPClient *client, bool chunked) +{ + _client = client; + _stream = client->getStreamPtr(); + _chunked = chunked; + _chunkHeader = chunked; + _len = client->getSize(); + INFLUXDB_CLIENT_DEBUG(F("[D] HttpStreamScanner: chunked: %s, size: %d\n"), _chunked?"true":"false", _len); +} + +bool HttpStreamScanner::next() { + while(_client->connected() && (_len > 0 || _len == -1)) { + _line = _stream->readStringUntil('\n'); + INFLUXDB_CLIENT_DEBUG(F("[D] HttpStreamScanner: line: %s\n"), _line.c_str()); + ++_linesNum; + int lineLen = _line.length(); + if(lineLen == 0) { + _error = HTTPC_ERROR_READ_TIMEOUT; + return false; + } + int r = lineLen +1; //+1 for terminating \n + _line.trim(); //remove \r + if(!_chunked || !_chunkHeader) { + _read += r; + if(_lastChunkLine.length() > 0) { //fix broken line + _line = _lastChunkLine + _line; + _lastChunkLine = ""; + } + + } + if(_chunkHeader && r == 2) { //empty line at the end of chunk + //last line was complete so return + _line = _lastChunkLine; + _lastChunkLine = ""; + return true; + } + if(_chunkHeader){ + _chunkLen = (int) strtol((const char *) _line.c_str(), NULL, 16); + INFLUXDB_CLIENT_DEBUG(F("[D] HttpStreamScanner chunk len: %d\n"), _chunkLen); + _chunkHeader = false; + _read = 0; + if(_chunkLen == 0) { //last chunk + _error = 0; + _line = ""; + return false; + } else { + continue; + } + } else if(_chunked && _read >= _chunkLen){ //we reached end of chunk. + _lastChunkLine = _line; + _chunkHeader = true; + continue; + } + + if(_len > 0) { + _len -= r; + INFLUXDB_CLIENT_DEBUG(F("[D] HttpStreamScanner new len: %d\n"), _len); + } + return true; + } + if(!_client->connected() && ( (_chunked && _chunkLen > 0) || (!_chunked && _len > 0))) { //report error only if we didn't went to + _error = HTTPC_ERROR_CONNECTION_LOST; + INFLUXDB_CLIENT_DEBUG(F("HttpStreamScanner connection lost\n")); + } + return false; +} + +void HttpStreamScanner::close() { + _client->end(); +} + diff --git a/src/query/HttpStreamScanner.h b/src/query/HttpStreamScanner.h new file mode 100644 index 0000000..a96b5b4 --- /dev/null +++ b/src/query/HttpStreamScanner.h @@ -0,0 +1,64 @@ +/** + * + * HttpStreamScanner.h: Scannes HttpClient stream for lines. Supports chunking. + * + * MIT License + * + * Copyright (c) 2020 InfluxData + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. +*/ +#ifndef _HTTP_STREAM_SCANNER_ +#define _HTTP_STREAM_SCANNER_ + +#if defined(ESP8266) +# include +#elif defined(ESP32) +# include +#endif //ESP8266 + +/** + * HttpStreamScanner parses response stream from HTTPClient for lines. + * By repeatedly calling next() it searches for new line. + * If next() returns false, it can mean end of stream or an error. + * Check getError() for nonzero if an error occured + */ +class HttpStreamScanner { +public: + HttpStreamScanner(HTTPClient *client, bool chunked); + bool next(); + void close(); + const String &getLine() const { return _line; } + int getError() const { return _error; } + int getLinesNum() const {return _linesNum; } +private: + HTTPClient *_client; + Stream *_stream = nullptr; + int _len; + String _line; + int _linesNum= 0; + int _read = 0; + bool _chunked; + bool _chunkHeader; + int _chunkLen = 0; + String _lastChunkLine; + int _error = 0; +}; + +#endif //#_HTTP_STREAM_SCANNER_ \ No newline at end of file diff --git a/src/util/debug.h b/src/util/debug.h new file mode 100644 index 0000000..c8545c1 --- /dev/null +++ b/src/util/debug.h @@ -0,0 +1,38 @@ +/** + * + * debug.h: InfluxDB Client debug macros + * + * MIT License + * + * Copyright (c) 2020 InfluxData + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. +*/ +#ifndef _INFLUXDB_CLIENT_DEBUG_H +#define _INFLUXDB_CLIENT_DEBUG_H + +#include + +#ifdef INFLUXDB_CLIENT_DEBUG +# define INFLUXDB_CLIENT_DEBUG(fmt, ...) Serial.printf_P( (PGM_P)PSTR(fmt), ## __VA_ARGS__ ) +#else +# define INFLUXDB_CLIENT_DEBUG(fmt, ...) +#endif //INFLUXDB_CLIENT_DEBUG + +#endif //# _INFLUXDB_CLIENT_DEBUG_H \ No newline at end of file diff --git a/src/util/helpers.cpp b/src/util/helpers.cpp new file mode 100644 index 0000000..3b92f93 --- /dev/null +++ b/src/util/helpers.cpp @@ -0,0 +1,48 @@ +/** + * + * helpers.cpp: InfluxDB Client util functions + * + * MIT License + * + * Copyright (c) 2020 InfluxData + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. +*/ +#include "helpers.h" + +void timeSync(const char *tzInfo, const char* ntpServer1, const char* ntpServer2, const char* ntpServer3) { + // Accurate time is necessary for certificate validion + + configTzTime(tzInfo,ntpServer1, ntpServer2, ntpServer3); + + // Wait till time is synced + Serial.print("Syncing time"); + int i = 0; + while (time(nullptr) < 1000000000ul && i < 40) { + Serial.print("."); + delay(500); + i++; + } + Serial.println(); + + // Show time + time_t tnow = time(nullptr); + Serial.print("Synchronized time: "); + Serial.println(ctime(&tnow)); +} \ No newline at end of file diff --git a/src/util/helpers.h b/src/util/helpers.h new file mode 100644 index 0000000..f5061ae --- /dev/null +++ b/src/util/helpers.h @@ -0,0 +1,37 @@ +/** + * + * helpers.h: InfluxDB Client util functions + * + * MIT License + * + * Copyright (c) 2020 InfluxData + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. +*/ +#ifndef _INFLUXDB_CLIENT_HELPERS_H +#define _INFLUXDB_CLIENT_HELPERS_H + +#include + +// Synchronize time with NTP servers and waits for completition. Prints waiting progress and final synchronized time to the serial. +// Accurate time is necessary for certificate validion and writing points in batch +// For the fastest time sync find NTP servers in your area: https://www.pool.ntp.org/zone/ +void timeSync(const char *tzInfo, const char* ntpServer1, const char* ntpServer2 = nullptr, const char* ntpServer3 = nullptr); + +#endif //_INFLUXDB_CLIENT_HELPERS_H \ No newline at end of file diff --git a/test/TestSupport.cpp b/test/TestSupport.cpp deleted file mode 100644 index c9c21ec..0000000 --- a/test/TestSupport.cpp +++ /dev/null @@ -1,99 +0,0 @@ - -#include "TestSupport.h" - -extern int failures; - -bool deleteAll(String url) { - String deleteUrl = url + "/api/v2/delete"; - HTTPClient http; - int code = 0; - if(http.begin(deleteUrl)) { - code = http.POST(""); - http.end(); - } - return code == 204; -} - - -int countParts(String &str, char separator) { - int lines = 0; - int i,from = 0; - while((i = str.indexOf(separator, from)) >= 0) { - ++lines; - from = i+1; - } - // try last part - if(from < str.length() && str.substring(from).length()>0) { - ++lines; - } - return lines; -} - -String *getParts(String &str, char separator, int &count) { - count = countParts(str, separator); - String *ret = new String[count]; - int i,from = 0,p=0; - while((i = str.indexOf(separator, from)) >= 0) { - ret[p++] = str.substring(from,i); - from = i+1; - } - // try last part - if(from < str.length() && str.substring(from).length()>0) { - ret[p] = str.substring(from); - } - return ret; -} - -int countLines(String &str) { - return countParts(str, '\n'); -} - -String *getLines(String &str, int &count) { - return getParts(str, '\n', count); -} - - -bool testAssertm(int line, bool state,String message) { - if(!state) { - ++failures; - Serial.printf("Assert failure line %d%s%s\n", line, message.length()>0?": ":"",message.c_str()); - return true; - } - return false; -} - -bool testAssert(int line, bool state) { - return testAssertm(line, state, ""); -} - -bool waitServer(InfluxDBClient &client, bool state) { - int c = 0; - bool res = false; - while((res = client.validateConnection()) != state && c++ < 30) { - Serial.printf(" Server is not %s\n", state?"up":"down"); - delay(1000); - } - return res == state; -} - -String queryFlux(String url, String token, String org, String &fluxQuery) { - HTTPClient httpClient; - String queryUrl = url + "/api/v2/query?org=" + org; - if(!httpClient.begin(queryUrl)) { - return ""; - } - httpClient.addHeader(F("Content-Type"), F("application/vnd.flux")); - - httpClient.addHeader(F("Authorization"), "Token " + token); - - int statusCode = httpClient.POST(fluxQuery); - String queryResult; - if(statusCode == 200) { - queryResult = httpClient.getString(); - queryResult.trim(); - } - - httpClient.end(); - - return queryResult; -} \ No newline at end of file diff --git a/test/TestSupport.h b/test/TestSupport.h index c058226..d87b600 100644 --- a/test/TestSupport.h +++ b/test/TestSupport.h @@ -6,27 +6,121 @@ #define TEST_ASSERT(a) if(testAssert(__LINE__, (a))) break #define TEST_ASSERTM(a,m) if(testAssertm(__LINE__, (a),(m))) break -#include +#include "query/FluxParser.h" -bool deleteAll(String url); +int failures = 0; -int countParts(String &str, char separator); +void printFreeHeap() { + Serial.print("Free heap: "); + Serial.println(ESP.getFreeHeap()); +} -String *getParts(String &str, char separator, int &count); +bool deleteAll(String url) { + String deleteUrl = url + "/api/v2/delete"; + HTTPClient http; + int code = 0; + if(http.begin(deleteUrl)) { + code = http.POST(""); + http.end(); + } + return code == 204; +} -int countLines(String &str); +bool serverLog(String url,String mess) { + String logUrl = url + "/log"; + HTTPClient http; + int code = 0; + if(http.begin(logUrl)) { + code = http.POST(mess); + http.end(); + } + return code == 204; +} -String *getLines(String &str, int &count); +int countParts(String &str, char separator) { + int lines = 0; + int i,from = 0; + while((i = str.indexOf(separator, from)) >= 0) { + ++lines; + from = i+1; + } + // try last part + if(from < str.length() && str.substring(from).length()>0) { + ++lines; + } + return lines; +} -bool testAssertm(int line, bool state,String message); +String *getParts(String &str, char separator, int &count) { + count = countParts(str, separator); + String *ret = new String[count]; + int i,from = 0,p=0; + while((i = str.indexOf(separator, from)) >= 0) { + ret[p++] = str.substring(from,i); + from = i+1; + } + // try last part + if(from < str.length() && str.substring(from).length()>0) { + ret[p] = str.substring(from); + } + return ret; +} -bool testAssert(int line, bool state); +int countLines(FluxQueryResult flux) { + int lines = 0; + while(flux.next()) { + lines++; + } + flux.close(); + return lines; +} + +std::vector getLines(FluxQueryResult flux) { + std::vector lines; + while(flux.next()) { + String line; + int i=0; + for(auto &val: flux.getValues()) { + if(i>0) line += ","; + line += val.getRawValue(); + i++; + } + lines.push_back(line); + } + flux.close(); + return lines; +} + + +bool testAssertm(int line, bool state,String message) { + if(!state) { + ++failures; + Serial.printf("Assert failure line %d%s%s\n", line, message.length()>0?": ":"",message.c_str()); + return true; + } + return false; +} + +bool testAssert(int line, bool state) { + return testAssertm(line, state, ""); +} // Waits for server in desired state (up - true, down - false) -bool waitServer(InfluxDBClient &client, bool state); -// Sends Flux query and returns raw JSON formatted response -// Return raw query response in the form of CSV table. Empty string can mean that query hasn't found anything or an error. Check getLastStatusCode() for 200 -String queryFlux(String url, String token, String org, String &fluxQuery); +bool waitServer(InfluxDBClient &client, bool state) { + int c = 0; + bool res = false; + while((res = client.validateConnection()) != state && c++ < 30) { + Serial.printf(" Server is not %s\n", state?"up":"down"); + delay(1000); + } + return res == state; +} + +bool compareTm(tm &tm1, tm &tm2) { + time_t t1 = mktime(&tm1); + time_t t2 = mktime(&tm2); + return difftime(t1, t2) == 0; +} #endif //_TEST_SUPPORT_H_ \ No newline at end of file diff --git a/test/customSettings.h b/test/customSettings.h new file mode 100644 index 0000000..1c96599 --- /dev/null +++ b/test/customSettings.h @@ -0,0 +1,6 @@ +#ifndef _CUSTOM_SETTING_H_ +#define _CUSTOM_SETTING_H_ + + + +#endif //_CUSTOM_SETTING_H_ \ No newline at end of file diff --git a/test/server/Readme.md b/test/server/Readme.md index 65667a4..67fcdc5 100644 --- a/test/server/Readme.md +++ b/test/server/Readme.md @@ -1,6 +1,6 @@ # InfluxDB 2 mock server -Mock server which simulates InfluxDB 2 write and query API. +Mock server which simulates InfluxDB 1 and 2 write and query API. First time, run: `npm install` to download dependencies. diff --git a/test/server/server.js b/test/server/server.js index b49d9f5..6b563f0 100644 --- a/test/server/server.js +++ b/test/server/server.js @@ -6,6 +6,7 @@ const app = express(); const port = 999; var pointsdb = []; var lastUserAgent = ''; +var chunked = false; app.use (function(req, res, next) { var data=''; @@ -15,7 +16,7 @@ app.use (function(req, res, next) { }); req.on('end', function() { - req.body = parsePoints(data); + req.body = data; next(); }); }); @@ -35,10 +36,17 @@ app.get('/ping', (req,res) => { res.status(204).end(); } }) +app.post('/log', (req,res) => { + console.log(req.body); + res.status(204).end(); +}) app.post('/api/v2/write', (req,res) => { + chunked = false; if(checkWriteParams(req, res) && handleAuthentication(req, res)) { - var points = req.body; + //console.log('Write'); + //console.log(req.body); + var points = parsePoints(req.body); if(Array.isArray(points) && points.length > 0) { var point = points[0]; if(point.tags.hasOwnProperty('direction')) { @@ -46,6 +54,7 @@ app.post('/api/v2/write', (req,res) => { case '429-1': res.set("Retry-After","30"); res.status(429).send("Limit exceeded"); + console.log('Retry-After 30'); break; case '429-2': res.status(429).send("Limit exceeded"); @@ -53,9 +62,10 @@ app.post('/api/v2/write', (req,res) => { case '503-1': res.set("Retry-After","10"); res.status(503).send("Server overloaded"); + console.log('Retry-After 10'); break; case '503-2': - console.log('Return'); + console.log('Server overloaded'); res.status(503).send("Server overloaded"); break; case 'delete-all': @@ -70,6 +80,9 @@ app.post('/api/v2/write', (req,res) => { points = []; res.status(500).send("internal server error"); break; + case 'chunked': + chunked = true; + break; } points.shift(); } @@ -91,7 +104,7 @@ app.post('/api/v2/write', (req,res) => { app.post('/write', (req,res) => { if(checkWriteParamsV1(req, res) ) { - var points = req.body; + var points = parsePoints(req.body); if(Array.isArray(points) && points.length > 0) { var point = points[0]; if(point.tags.hasOwnProperty('direction')) { @@ -133,13 +146,115 @@ app.post('/api/v2/delete', (req,res) => { res.status(204).end(); }); +var queryRes = { + "singleTable":`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +,result,table,_start,_stop,_time,_value,_field,_measurement,a,b +,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf +,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,3,adsfasdf +\r +`, + "nil-value": `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +,result,table,_start,_stop,_time,_value,_field,_measurement,a,b +,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,,f,test,1,adsfasdf +,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,,adsfasdf +,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:11:32.225467895Z,1122.45,f,test,3, +\r +`, +"multiTables":`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,unsignedLong,string,string,string,string +,result,table,_start,_stop,_time,_value,_field,_measurement,a,b +,_result,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,14,f,test,1,adsfasdf +,_result,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,66,f,test,1,adsfasdf +\r +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string +,result,table,_start,_stop,_time,_value,_field,_measurement,a,b +,_result1,1,2020-02-16T22:19:49.747562847Z,2020-02-17T22:19:49.747562847Z,2020-02-17T10:34:08.135814545Z,-4,i,test,1,adsfasdf +,_result1,1,2020-02-16T22:19:49.747562847Z,2020-02-17T22:19:49.747562847Z,2020-02-16T22:08:44.850214724Z,-1,i,test,1,adsfasdf +\r +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,bool,string,string,string,string +,result,table,_start,_stop,_time,_value,_field,_measurement,a,b +,_result2,2,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,false,b,test,0,brtfgh +,_result2,2,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.969100374Z,true,b,test,0,brtfgh +\r +#datatype,string,long,dateTime:RFC3339Nano,dateTime:RFC3339Nano,dateTime:RFC3339Nano,duration,string,string,string,base64Binary +,result,table,_start,_stop,_time,_value,_field,_measurement,a,b +,_result3,3,2020-02-10T22:19:49.747562847Z,2020-02-12T22:19:49.747562847Z,2020-02-11T10:34:08.135814545Z,1d2h3m4s,d,test,0,eHh4eHhjY2NjY2NkZGRkZA== +,_result3,3,2020-02-10T22:19:49.747562847Z,2020-02-12T22:19:49.747562847Z,2020-02-12T22:08:44.969100374Z,22h52s,d,test,0,ZGF0YWluYmFzZTY0 +\r +`, +"diffNum-data":`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,duration,base64Binary,dateTime:RFC3339 +,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start +,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z,2345234 +\r +`, +"diffNum-type-vs-header":`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,duration,base64Binary,dateTime:RFC3339 +,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note +,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z +\r +`, +"flux-error":`{"code":"invalid","message":"compilation failed: loc 4:17-4:86: expected an operator between two expressions"}`, +"invalid-datatype":`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,int,string,duration,base64Binary,dateTime:RFC3339 +,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start +,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z +,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z +\r +`, +"missing-datatype":`,result,table,_start,_stop,_time,_value,_field,_measurement,a,b +,_result3,3,2020-02-10T22:19:49.747562847Z,2020-02-12T22:19:49.747562847Z,2020-02-11T10:34:08.135814545Z,1d2h3m4s,d,test,0,eHh4eHhjY2NjY2NkZGRkZA== +,_result3,3,2020-02-10T22:19:49.747562847Z,2020-02-12T22:19:49.747562847Z,2020-02-12T22:08:44.969100374Z,22h52s,d,test,0,eHh4eHhjY2NjY2NkZGRkZA== +\r +`, +"error-it-row-full":`#datatype,string,string +,error,reference +,failed to create physical plan: invalid time bounds from procedure from: bounds contain zero time,897 +\r +`, +"error-it-row-no-reference":`#datatype,string,string +,error,reference +,failed to create physical plan: invalid time bounds from procedure from: bounds contain zero time, +\r +`, +"error-it-row-no-message":`#datatype,string,string +,error,reference +,, +\r +`, +"empty":`` +}; + app.post('/api/v2/query', (req,res) => { + //console.log("Query with: " + req.body); if(checkQueryParams(req, res) && handleAuthentication(req, res)) { - if(pointsdb.length > 0) { + var queryObj = JSON.parse(req.body); + var data = ''; + var status = 200; + if (queryObj["query"].startsWith('testquery-')) { + var qi = queryObj["query"].substring(10) ; + console.log('query: ' + qi + ' dataset'); + if(qi.endsWith('error')) { + status = 400; + } + data = queryRes[qi]; + } else if(pointsdb.length > 0) { console.log('query: ' + pointsdb.length + ' points'); - res.status(200).send(convertToCSV(pointsdb)); + data = convertToCSV(pointsdb); + } + if(data.length > 0) { + //console.log(data); + + if(chunked) { + var i = data.length/3; + res.set("Transfer-Encoding","chunked"); + res.status(status); + res.write(data.substring(0, i+1)); + res.write(data.substring(i+1, 2*i+1)); + res.write(data.substring(2*i+1)); + res.end(); + chunked = false; + } else { + res.status(status).send(data); + } } else { - res.status(200).end(); + res.status(status).end(); } } }); @@ -275,17 +390,22 @@ function checkQueryParams(req, res) { } } -function objectToCSV(obj, header) { +function objectToCSV(obj, type, level) { var line = ''; + if(level == 1) line = type==0?'#datatype,':','; + var i = 0; for (var index in obj) { - if (line != '') line += ',' + if (i>0) line += ','; if(typeof obj[index] == 'object') { - line += objectToCSV(obj[index], header); - } else if(header) { + line += objectToCSV(obj[index], type, level+1); + } else if(type == 0) { //datatype header + line += 'string'; + } else if(type == 1) { line += index; } else { line += obj[index]; } + i++; } return line; } @@ -295,12 +415,13 @@ function convertToCSV(objArray) { var str = ''; if(array.length > 0) { - str = objectToCSV(array[0], true) + '\r\n'; + str = objectToCSV(array[0], 0, 1) + '\r\n'; + str += objectToCSV(array[0], 1, 1) + '\r\n'; } for (var i = 0; i < array.length; i++) { var line = ''; - line = objectToCSV(array[i], false); + line = objectToCSV(array[i], 2, 1); str += line + '\r\n'; } diff --git a/test/test.ino b/test/test.ino index b431227..2640f17 100644 --- a/test/test.ino +++ b/test/test.ino @@ -28,7 +28,7 @@ String deviceName = "ESP8266"; #define INFLUXDB_CLIENT_TESTING_PASS "password" #define INFLUXDB_CLIENT_TESTING_BAD_URL "http://127.0.0.1:999" -int failures = 0; +#include "customSettings.h" #include "TestSupport.h" #include @@ -45,26 +45,37 @@ void setup() { Serial.println(); initInet(); +} +void loop() { //tests testPoint(); + testFluxTypes(); + testFluxParserEmpty(); + + testFluxParserSingleTable(); + testFluxParserNilValue(); + testFluxParserMultiTables(false); + testFluxParserMultiTables(true); + testFluxParserErrorDiffentColumnsNum(); + testFluxParserFluxError(); + testFluxParserInvalidDatatype(); + testFluxParserMissingDatatype(); + testFluxParserErrorInRow(); testBasicFunction(); testInit(); testV1(); testUserAgent(); testFailedWrites(); testTimestamp(); - testRetryOnFailedConnection(); - testBufferOverwriteBatchsize1(); - testBufferOverwriteBatchsize5(); - testServerTempDownBatchsize5(); - testRetriesOnServerOverload(); + // testRetryOnFailedConnection(); + // testBufferOverwriteBatchsize1(); + // testBufferOverwriteBatchsize5(); + // testServerTempDownBatchsize5(); + // testRetriesOnServerOverload(); Serial.printf("Test %s\n", failures ? "FAILED" : "SUCCEEDED"); -} - -void loop() { - delay(1000); + while(1) delay(1000); } void testPoint() { @@ -190,8 +201,10 @@ void testBasicFunction() { } TEST_ASSERT(client.isBufferEmpty()); String query = "select"; - String q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - TEST_ASSERT(countLines(q) == 6); //5 points+header + FluxQueryResult q = client.query(query); + int count = countLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERTM( count == 5, String(count) + " vs 5"); //5 points // test precision for (int i = (int)WritePrecision::NoTime; i <= (int)WritePrecision::NS; i++) { @@ -225,11 +238,18 @@ void testInit() { { InfluxDBClient client; String query = "select"; + FluxQueryResult q = client.query(query); + TEST_ASSERT(!q.next()); + TEST_ASSERT(q.getError() == "Unconfigured instance"); + TEST_ASSERT(client.getLastStatusCode() == 0); + TEST_ASSERT(client.getLastErrorMessage() == "Unconfigured instance"); + client.setConnectionParams(INFLUXDB_CLIENT_TESTING_URL, INFLUXDB_CLIENT_TESTING_ORG, INFLUXDB_CLIENT_TESTING_BUC, INFLUXDB_CLIENT_TESTING_TOK); String rec = "a,a=1 a=3"; TEST_ASSERT(client.writeRecord(rec)); - String q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - TEST_ASSERT(countLines(q) == 2); //3 points+header + q = client.query(query); + TEST_ASSERT(countLines(q) == 1); + TEST_ASSERTM(q.getError()=="", q.getError()); } TEST_END(); @@ -269,6 +289,7 @@ void testRetryOnFailedConnection() { InfluxDBClient clientOk(INFLUXDB_CLIENT_TESTING_URL, INFLUXDB_CLIENT_TESTING_ORG, INFLUXDB_CLIENT_TESTING_BUC, INFLUXDB_CLIENT_TESTING_TOK); clientOk.setWriteOptions(WritePrecision::NoTime, 1, 5); + Serial.println("Stop server!"); waitServer(clientOk, false); TEST_ASSERT(!clientOk.validateConnection()); Point *p = createPoint("test1"); @@ -287,8 +308,8 @@ void testRetryOnFailedConnection() { delete p; TEST_ASSERT(clientOk.isBufferEmpty()); String query = "select"; - String q = queryFlux(clientOk.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - TEST_ASSERT(countLines(q) == 4); //3 points+header + FluxQueryResult q = clientOk.query(query); + TEST_ASSERT(countLines(q) == 3); TEST_END(); deleteAll(INFLUXDB_CLIENT_TESTING_URL); @@ -318,16 +339,15 @@ void testBufferOverwriteBatchsize1() { TEST_ASSERT(client.isBufferEmpty()); String query = "select"; - String q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - int count; - String *lines = getLines(q, count); - TEST_ASSERTM(count == 6, String("6 != " + count)); //5 points+header - TEST_ASSERT(lines[1].indexOf(",8") > 0); - TEST_ASSERT(lines[2].indexOf(",9") > 0); - TEST_ASSERT(lines[3].indexOf(",10") > 0); - TEST_ASSERT(lines[4].indexOf(",11") > 0); - TEST_ASSERT(lines[5].indexOf(",12") > 0); - delete[] lines; + FluxQueryResult q = client.query(query); + std::vector lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERTM(lines.size() == 5, String("5 != " + lines.size())); //5 points + TEST_ASSERT(lines[0].indexOf(",8") > 0); + TEST_ASSERT(lines[1].indexOf(",9") > 0); + TEST_ASSERT(lines[2].indexOf(",10") > 0); + TEST_ASSERT(lines[3].indexOf(",11") > 0); + TEST_ASSERT(lines[4].indexOf(",12") > 0); TEST_END(); deleteAll(INFLUXDB_CLIENT_TESTING_URL); @@ -360,17 +380,16 @@ void testBufferOverwriteBatchsize5() { TEST_ASSERT(client.flushBuffer()); String query = "select"; - String q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - int count; - String *lines = getLines(q, count); - TEST_ASSERT(count == 13); //12 points+header - TEST_ASSERT(lines[1].indexOf(",16") > 0); - TEST_ASSERT(lines[2].indexOf(",17") > 0); - TEST_ASSERT(lines[3].indexOf(",18") > 0); - TEST_ASSERT(lines[4].indexOf(",19") > 0); - TEST_ASSERT(lines[5].indexOf(",20") > 0); - TEST_ASSERT(lines[12].indexOf(",27") > 0); - delete[] lines; + FluxQueryResult q = client.query(query); + std::vector lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERT(lines.size() == 12); //12 points + TEST_ASSERT(lines[0].indexOf(",16") > 0); + TEST_ASSERT(lines[1].indexOf(",17") > 0); + TEST_ASSERT(lines[2].indexOf(",18") > 0); + TEST_ASSERT(lines[3].indexOf(",19") > 0); + TEST_ASSERT(lines[4].indexOf(",20") > 0); + TEST_ASSERT(lines[11].indexOf(",27") > 0); deleteAll(INFLUXDB_CLIENT_TESTING_URL); // buffer has been emptied, now writes should go according batch size for (int i = 0; i < 4; i++) { @@ -380,22 +399,24 @@ void testBufferOverwriteBatchsize5() { delete p; } TEST_ASSERT(!client.isBufferEmpty()); - q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); + q = client.query(query); TEST_ASSERT(countLines(q) == 0); + TEST_ASSERTM(q.getError()=="", q.getError()); p = createPoint("test1"); p->addField("index", 4); TEST_ASSERT(client.writePoint(*p)); TEST_ASSERT(client.isBufferEmpty()); - q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - lines = getLines(q, count); - TEST_ASSERT(count == 6); //5 points+header - TEST_ASSERT(lines[1].indexOf(",0") > 0); - TEST_ASSERT(lines[2].indexOf(",1") > 0); - TEST_ASSERT(lines[3].indexOf(",2") > 0); - TEST_ASSERT(lines[4].indexOf(",3") > 0); - TEST_ASSERT(lines[5].indexOf(",4") > 0); - delete[] lines; + + q = client.query(query); + lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERT(lines.size() == 5); + TEST_ASSERT(lines[0].indexOf(",0") > 0); + TEST_ASSERT(lines[1].indexOf(",1") > 0); + TEST_ASSERT(lines[2].indexOf(",2") > 0); + TEST_ASSERT(lines[3].indexOf(",3") > 0); + TEST_ASSERT(lines[4].indexOf(",4") > 0); TEST_END(); deleteAll(INFLUXDB_CLIENT_TESTING_URL); @@ -416,8 +437,9 @@ void testServerTempDownBatchsize5() { } TEST_ASSERT(client.isBufferEmpty()); String query = "select"; - String q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - TEST_ASSERT(countLines(q) == 16); //15 points+header + FluxQueryResult q = client.query(query); + TEST_ASSERT(countLines(q) == 15); + TEST_ASSERTM(q.getError()=="", q.getError()); deleteAll(INFLUXDB_CLIENT_TESTING_URL); Serial.println("Stop server"); @@ -439,8 +461,10 @@ void testServerTempDownBatchsize5() { p->addField("index", 15); TEST_ASSERT(client.writePoint(*p)); TEST_ASSERT(client.isBufferEmpty()); - q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - TEST_ASSERT(countLines(q) == 17); //16 points+header + q = client.query(query); + TEST_ASSERT(countLines(q) == 16); + TEST_ASSERTM(q.getError()=="", q.getError()); + deleteAll(INFLUXDB_CLIENT_TESTING_URL); Serial.println("Stop server"); @@ -460,17 +484,16 @@ void testServerTempDownBatchsize5() { waitServer(client, true); TEST_ASSERT(client.flushBuffer()); - q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - int count; - String *lines = getLines(q, count); - TEST_ASSERT(count == 21); //20 points+header - TEST_ASSERT(lines[1].indexOf(",5") > 0); - TEST_ASSERT(lines[2].indexOf(",6") > 0); - TEST_ASSERT(lines[3].indexOf(",7") > 0); - TEST_ASSERT(lines[4].indexOf(",8") > 0); - TEST_ASSERT(lines[19].indexOf(",23") > 0); - TEST_ASSERT(lines[20].indexOf(",24") > 0); - delete[] lines; + q = client.query(query); + std::vector lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERT(lines.size() == 20); + TEST_ASSERT(lines[0].indexOf(",5") > 0); + TEST_ASSERT(lines[1].indexOf(",6") > 0); + TEST_ASSERT(lines[2].indexOf(",7") > 0); + TEST_ASSERT(lines[3].indexOf(",8") > 0); + TEST_ASSERT(lines[18].indexOf(",23") > 0); + TEST_ASSERT(lines[19].indexOf(",24") > 0); deleteAll(INFLUXDB_CLIENT_TESTING_URL); TEST_END(); @@ -491,8 +514,9 @@ void testRetriesOnServerOverload() { } TEST_ASSERT(client.isBufferEmpty()); String query = "select"; - String q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - TEST_ASSERT(countLines(q) == 61); //60 points+header + FluxQueryResult q = client.query(query); + TEST_ASSERT(countLines(q) == 60); + TEST_ASSERTM(q.getError()=="", q.getError()); deleteAll(INFLUXDB_CLIENT_TESTING_URL); String rec = "a,direction=429-1 a=1"; @@ -522,13 +546,12 @@ void testRetriesOnServerOverload() { TEST_ASSERT(!client.isBufferEmpty()); TEST_ASSERT(client.flushBuffer()); TEST_ASSERT(client.isBufferEmpty()); - q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - int count; - String *lines = getLines(q, count); - TEST_ASSERT(count == 40); //39 points+header - TEST_ASSERT(lines[1].indexOf(",11") > 0); - TEST_ASSERT(lines[39].indexOf(",49") > 0); - delete[] lines; + q = client.query(query); + std::vector lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERT(lines.size() == 39); + TEST_ASSERT(lines[0].indexOf(",11") > 0); + TEST_ASSERT(lines[38].indexOf(",49") > 0); deleteAll(INFLUXDB_CLIENT_TESTING_URL); rec = "a,direction=429-2 a=1"; @@ -558,12 +581,12 @@ void testRetriesOnServerOverload() { TEST_ASSERT(!client.isBufferEmpty()); TEST_ASSERT(client.flushBuffer()); TEST_ASSERT(client.isBufferEmpty()); - q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - lines = getLines(q, count); - TEST_ASSERT(count == 40); //39 points+header - TEST_ASSERT(lines[1].indexOf(",11") > 0); - TEST_ASSERT(lines[39].indexOf(",49") > 0); - delete[] lines; + q = client.query(query); + lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERT(lines.size() == 39); + TEST_ASSERT(lines[0].indexOf(",11") > 0); + TEST_ASSERT(lines[38].indexOf(",49") > 0); deleteAll(INFLUXDB_CLIENT_TESTING_URL); rec = "a,direction=503-1 a=1"; @@ -593,13 +616,12 @@ void testRetriesOnServerOverload() { TEST_ASSERT(!client.isBufferEmpty()); TEST_ASSERT(client.flushBuffer()); TEST_ASSERT(client.isBufferEmpty()); - q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - TEST_ASSERT(countLines(q) == 51); //50 points+header - lines = getLines(q, count); - TEST_ASSERT(count == 51); //50 points+header - TEST_ASSERT(lines[1].indexOf(",0") > 0); - TEST_ASSERT(lines[50].indexOf(",49") > 0); - delete[] lines; + q = client.query(query); + lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERT(lines.size() == 50); + TEST_ASSERT(lines[0].indexOf(",0") > 0); + TEST_ASSERT(lines[49].indexOf(",49") > 0); deleteAll(INFLUXDB_CLIENT_TESTING_URL); rec = "a,direction=503-2 a=1"; @@ -630,12 +652,12 @@ void testRetriesOnServerOverload() { TEST_ASSERT(client.flushBuffer()); TEST_ASSERT(client.isBufferEmpty()); - q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - lines = getLines(q, count); - TEST_ASSERT(count == 40); //39 points+header - TEST_ASSERT(lines[1].indexOf(",11") > 0); - TEST_ASSERT(lines[39].indexOf(",49") > 0); - delete[] lines; + q = client.query(query); + lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERT(lines.size() == 39); + TEST_ASSERT(lines[0].indexOf(",11") > 0); + TEST_ASSERT(lines[38].indexOf(",49") > 0); TEST_END(); deleteAll(INFLUXDB_CLIENT_TESTING_URL); @@ -654,19 +676,19 @@ void testFailedWrites() { p->addTag("direction", i > 10 ? "500" : "400"); } p->addField("index", i); - TEST_ASSERTM(client.writePoint(*p) == (i % 5 != 0), String("i=") + i); + TEST_ASSERTM(client.writePoint(*p) == (i % 5 != 0), String("i=") + i + client.getLastErrorMessage()); delete p; } int count; String query = ""; - String q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - String *lines = getLines(q, count); - TEST_ASSERT(count == 17); //16 points+header - TEST_ASSERTM(lines[1].indexOf(",1") > 0, lines[1]); - TEST_ASSERTM(lines[5].indexOf(",6") > 0, lines[5]); - TEST_ASSERTM(lines[10].indexOf(",12") > 0, lines[10]); - TEST_ASSERTM(lines[16].indexOf(",19") > 0, lines[16]); - delete[] lines; + FluxQueryResult q = client.query(query); + std::vector lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERT(lines.size() == 16); //12 points+header + TEST_ASSERTM(lines[0].indexOf(",1") > 0, lines[0]); + TEST_ASSERTM(lines[4].indexOf(",6") > 0, lines[4]); + TEST_ASSERTM(lines[9].indexOf(",12") > 0, lines[9]); + TEST_ASSERTM(lines[15].indexOf(",19") > 0, lines[15]); deleteAll(INFLUXDB_CLIENT_TESTING_URL); //test with batching @@ -682,14 +704,14 @@ void testFailedWrites() { delete p; } - q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - lines = getLines(q, count); + q = client.query(query); + lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); //3 batches should be skipped - TEST_ASSERT(count == 16); //15 points+header - TEST_ASSERTM(lines[1].indexOf(",5") > 0, lines[1]); - TEST_ASSERTM(lines[6].indexOf(",15") > 0, lines[6]); - TEST_ASSERTM(lines[11].indexOf(",25") > 0, lines[11]); - delete[] lines; + TEST_ASSERT(lines.size() == 15); //15 points+header + TEST_ASSERTM(lines[0].indexOf(",5") > 0, lines[0]); + TEST_ASSERTM(lines[5].indexOf(",15") > 0, lines[5]); + TEST_ASSERTM(lines[10].indexOf(",25") > 0, lines[10]); TEST_END(); deleteAll(INFLUXDB_CLIENT_TESTING_URL); @@ -697,7 +719,7 @@ void testFailedWrites() { void testTimestamp() { TEST_INIT("testTimestamp"); - + serverLog(INFLUXDB_CLIENT_TESTING_URL, "testTimestamp"); InfluxDBClient client(INFLUXDB_CLIENT_TESTING_URL, INFLUXDB_CLIENT_TESTING_ORG, INFLUXDB_CLIENT_TESTING_BUC, INFLUXDB_CLIENT_TESTING_TOK); client.setWriteOptions(WritePrecision::S, 1, 5); //test with no batching @@ -725,10 +747,11 @@ void testTimestamp() { } int count; String query = ""; - String q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - String *lines = getLines(q, count); - TEST_ASSERT(count == 21); //20 points+header - for (int i = 1; i < count; i++) { + FluxQueryResult q = client.query(query); + std::vector lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERT(lines.size() == 20); + for (int i = 0; i < lines.size(); i++) { int partsCount; String *parts = getParts(lines[i], ',', partsCount); TEST_ASSERTM(partsCount == 11, String(i) + ":" + lines[i]); //1measurement,4tags,5fields, 1timestamp @@ -736,7 +759,6 @@ void testTimestamp() { TEST_ASSERTM(parts[10].length() == 10, String(i) + ":" + lines[i]); delete[] parts; } - delete[] lines; deleteAll(INFLUXDB_CLIENT_TESTING_URL); client.setWriteOptions(WritePrecision::NoTime, 2, 5); @@ -747,39 +769,41 @@ void testTimestamp() { TEST_ASSERTM(client.writePoint(*p), String("i=") + i); delete p; } - q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - lines = getLines(q, count); - TEST_ASSERT(count == 21); //20 points+header - for (int i = 1; i < count; i++) { + q = client.query(query); + lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERT(lines.size() == 20); //20 points+header + for (int i = 0; i < lines.size(); i++) { int partsCount; String *parts = getParts(lines[i], ',', partsCount); TEST_ASSERTM(partsCount == 10, String(i) + ":" + lines[i]); //1measurement,4tags,5fields delete[] parts; } - delete[] lines; TEST_END(); deleteAll(INFLUXDB_CLIENT_TESTING_URL); + serverLog(INFLUXDB_CLIENT_TESTING_URL, "testTimestamp end"); } void testV1() { TEST_INIT("testV1"); InfluxDBClient client(INFLUXDB_CLIENT_TESTING_URL, INFLUXDB_CLIENT_TESTING_DB); + InfluxDBClient queryClient(INFLUXDB_CLIENT_TESTING_URL, INFLUXDB_CLIENT_TESTING_ORG, INFLUXDB_CLIENT_TESTING_BUC, INFLUXDB_CLIENT_TESTING_TOK); - TEST_ASSERT(client.validateConnection()); + TEST_ASSERTM(client.validateConnection(), client.getLastErrorMessage()); //test with no batching for (int i = 0; i < 20; i++) { Point *p = createPoint("test1"); p->addField("index", i); - TEST_ASSERTM(client.writePoint(*p), String("i=") + i); + TEST_ASSERTM(client.writePoint(*p), String("i=") + i + client.getLastErrorMessage()); delete p; } String query = "select"; - String q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - int count; - String *lines = getLines(q, count); - TEST_ASSERT(count == 21); + FluxQueryResult q = queryClient.query(query); + std::vector lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERTM(lines.size() == 20, String(lines.size()) + " vs 20"); deleteAll(INFLUXDB_CLIENT_TESTING_URL); //test with w/ batching 5 @@ -788,12 +812,13 @@ void testV1() { for (int i = 0; i < 15; i++) { Point *p = createPoint("test1"); p->addField("index", i); - TEST_ASSERTM(client.writePoint(*p), String("i=") + i); + TEST_ASSERTM(client.writePoint(*p), String("i=") + i + client.getLastErrorMessage()); delete p; } - q = queryFlux(client.getServerUrl(),INFLUXDB_CLIENT_TESTING_TOK, INFLUXDB_CLIENT_TESTING_ORG, query); - lines = getLines(q, count); - TEST_ASSERT(count == 16); + q = queryClient.query(query); + lines = getLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERT(lines.size() == 15); // test precision for (int i = (int)WritePrecision::NoTime; i <= (int)WritePrecision::NS; i++) { @@ -806,6 +831,606 @@ void testV1() { TEST_END(); deleteAll(INFLUXDB_CLIENT_TESTING_URL); } + +void testFluxTypes() { + TEST_INIT("testFluxTypes"); + FluxValue val1; + TEST_ASSERTM(val1.isNull(),"val1.isNull"); + TEST_ASSERTM(val1.getRawValue() == "","val1.getRawValue()"); + TEST_ASSERTM(val1.getString() == "","val1.getString()"); + FluxValue val2(new FluxLong("111", 111)); + TEST_ASSERTM(!val2.isNull(),"!val2.isNull"); + TEST_ASSERTM(val2.getLong() == 111,"val2.getLong"); + TEST_ASSERTM(val2.getRawValue() == "111","val2.getRawValue"); + TEST_ASSERTM(val2.getUnsignedLong() == 0,"val2.getUnsignedLong"); + TEST_ASSERTM(val2.getString() == "","val2.getString()"); + val1 = val2; + TEST_ASSERTM(!val1.isNull(),"!val1.isNull"); + TEST_ASSERTM(val1.getLong() == 111,"val1.getLong"); + TEST_ASSERTM(val1.getRawValue() == "111","val1.getRawValue"); + TEST_ASSERTM(val1.getString() == "","val1.getString()"); + val2 = nullptr; + TEST_ASSERTM(val2.isNull(),"val2.isNull"); + TEST_ASSERTM(!val1.isNull(),"!val1.isNull"); + TEST_ASSERTM(val1.getLong() == 111,"val1.getLong"); + TEST_ASSERTM(val1.getRawValue() == "111","val1.getRawValue"); + TEST_ASSERTM(val1.getString() == "","val1.getString()"); + + FluxValue val3(new FluxUnsignedLong("123456", 123456)); + TEST_ASSERTM(!val3.isNull(),"!val3.isNull"); + TEST_ASSERTM(val3.getUnsignedLong() == 123456,"val3.getUnsignedLong"); + TEST_ASSERTM(val3.getRawValue() == "123456","val3.getRawValue"); + TEST_ASSERTM(val3.getLong() == 0,"val3.getLong"); + TEST_ASSERTM(val3.getString() == "","val3.getString()"); + + val2 = val3; + TEST_ASSERTM(!val2.isNull(),"!val2.isNull"); + TEST_ASSERTM(val2.getUnsignedLong() == 123456,"val2.getUnsignedLong"); + TEST_ASSERTM(val2.getRawValue() == "123456","val2.getRawValue"); + TEST_ASSERTM(val2.getLong() == 0,"val2.getLong"); + + FluxValue val4(new FluxDouble("12.14", 12.14)); + TEST_ASSERTM(!val4.isNull(),"!val4.isNull"); + TEST_ASSERTM(val4.getDouble() == 12.14,"val4.getDouble"); + TEST_ASSERTM(val4.getLong() == 0,"val4.getLong"); + TEST_ASSERTM(val4.getRawValue() == "12.14","val4.getRawValue"); + TEST_ASSERTM(val4.getString() == "","val4.getString()"); + + FluxValue val5(new FluxBool("true", true)); + TEST_ASSERTM(!val5.isNull(),"!val5.isNull"); + TEST_ASSERTM(val5.getBool(),"val5.getBool"); + TEST_ASSERTM(val5.getDouble() == 0.0,"val5.getDouble"); + TEST_ASSERTM(val5.getLong() == 0,"val45getLong"); + TEST_ASSERTM(val5.getRawValue() == "true","val5.getRawValue"); + TEST_ASSERTM(val5.getString() == "","val5.getString()"); + + FluxValue val6(new FluxDateTime("2020-05-21T09:34:15.1234Z", FluxDatatypeDatetimeRFC3339, {15,34,9,21,4,120,0,0,0}, 123400)); + TEST_ASSERTM(!val6.isNull(),"!val6.isNull"); + TEST_ASSERTM(!val6.getBool(),"val6.getBool"); + TEST_ASSERTM(val6.getLong() == 0,"val6.getLong"); + TEST_ASSERTM(val6.getRawValue() == "2020-05-21T09:34:15.1234Z","val6.getRawValue"); + TEST_ASSERTM(val6.getString() == "","val6.getString()"); + struct tm t1 = {15,34,9,21,4,120,0,0,0}; + struct tm tx = val6.getDateTime().value; + TEST_ASSERTM(compareTm(tx,t1), "val6.getDateTime().value"); + TEST_ASSERTM(val6.getDateTime().microseconds == 123400,"val6.getDateTime().microseconds"); + String dtStr = val6.getDateTime().format("%F %T"); + TEST_ASSERTM(dtStr == "2020-05-21 09:34:15",dtStr); + + + FluxValue val7(new FluxDateTime("2020-05-22T09:34:15.123456Z", FluxDatatypeDatetimeRFC3339Nano, {15,34,9,22,4,120,0,0,0}, 123456)); + TEST_ASSERTM(!val7.isNull(),"!val7.isNull"); + TEST_ASSERTM(!val7.getBool(),"val7.getBool"); + TEST_ASSERTM(val7.getLong() == 0,"val7.getLong"); + TEST_ASSERTM(val7.getRawValue() == "2020-05-22T09:34:15.123456Z","val7.getRawValue"); + TEST_ASSERTM(val7.getString() == "","val7.getString()"); + struct tm t2 = {15,34,9,22,4,120,0,0,0}; + tx = val7.getDateTime().value; + TEST_ASSERTM(compareTm(tx,t2), "val7.getDateTime().value"); + TEST_ASSERTM(val7.getDateTime().microseconds == 123456,"val7.getDateTime().microseconds"); + + FluxValue val8(new FluxString("test string", FluxDatatypeString)); + TEST_ASSERTM(!val8.isNull(),"!val8.isNull"); + TEST_ASSERTM(!val8.getBool(),"val8.getBool"); + TEST_ASSERTM(val8.getLong() == 0,"val8.getLong"); + TEST_ASSERTM(val8.getRawValue() == "test string","val8.getRawValue"); + TEST_ASSERTM(val8.getString() == "test string","val8.getString()"); + + FluxValue val9(new FluxString("1h4m5s", FluxDatatypeDuration)); + TEST_ASSERTM(!val9.isNull(),"!val9.isNull"); + TEST_ASSERTM(!val9.getBool(),"val9.getBool"); + TEST_ASSERTM(val9.getLong() == 0,"val9.getLong"); + TEST_ASSERTM(val9.getRawValue() == "1h4m5s","val9.getRawValue"); + TEST_ASSERTM(val9.getString() == "1h4m5s","val9.getString()"); + + FluxValue val10(new FluxString("ZGF0YWluYmFzZTY0", FluxBinaryDataTypeBase64)); + TEST_ASSERTM(!val10.isNull(),"!val10.isNull"); + TEST_ASSERTM(!val10.getBool(),"val10.getBool"); + TEST_ASSERTM(val10.getLong() == 0,"val10.getLong"); + TEST_ASSERTM(val10.getRawValue() == "ZGF0YWluYmFzZTY0","val10.getRawValue"); + TEST_ASSERTM(val10.getString() == "ZGF0YWluYmFzZTY0","val10.getString()"); + TEST_END(); +} + +void testFluxParserEmpty() { + TEST_INIT("testFluxParser"); + FluxQueryResult flux("Error sss"); + TEST_ASSERTM(!flux.next(),"!flux.next()"); + TEST_ASSERTM(flux.getError() == "Error sss","flux.getError"); + TEST_ASSERTM(flux.getValues().size() == 0,"flux.getValues().size()"); + TEST_ASSERTM(flux.getColumnsDatatype().size() == 0,"flux.getColumnsDatatype().size()"); + TEST_ASSERTM(flux.getColumnsName().size() == 0,"flux.getColumnsName().size()"); + TEST_ASSERTM(flux.getValueByIndex(0).isNull(),"flux.getValueByIndex(0).isNull()"); + TEST_ASSERTM(!flux.hasTableChanged(),"hasTableChanged"); + TEST_ASSERTM(flux.getTablePosition()==-1,"getTablePosition"); + TEST_ASSERTM(flux.getValueByName("xxx").isNull(),"flux.getValueByName(\"xxx\").isNull()"); + + flux.close(); + // test unitialized + InfluxDBClient client; + flux = client.query("s"); + TEST_ASSERTM(!flux.next(),"!flux.next()"); + TEST_ASSERTM(flux.getError() == "Unconfigured instance",flux.getError()); + + flux.close(); + + //test empty results set + InfluxDBClient client2(INFLUXDB_CLIENT_TESTING_URL, INFLUXDB_CLIENT_TESTING_ORG, INFLUXDB_CLIENT_TESTING_BUC, INFLUXDB_CLIENT_TESTING_TOK); + flux = client2.query("testquery-empty"); + + TEST_ASSERTM(!flux.next(),"flux.next()"); + TEST_ASSERTM(flux.getError() == "",flux.getError()); + + flux.close(); + + TEST_END(); +} + +bool testFluxDateTimeValue(FluxQueryResult flux, int columnIndex, const char *columnName, const char *rawValue, tm time, unsigned long us) { + do { + TEST_ASSERTM(flux.getValueByIndex(columnIndex).getRawValue() == rawValue, flux.getValueByName(columnName).getRawValue()); + FluxDateTime dt = flux.getValueByIndex(columnIndex).getDateTime(); + TEST_ASSERTM(compareTm(time, dt.value), flux.getValueByIndex(columnIndex).getRawValue()); + TEST_ASSERTM(dt.microseconds == us, String(dt.microseconds) + " vs " + String(us)); + dt = flux.getValueByName(columnName).getDateTime(); + TEST_ASSERTM(compareTm(time, dt.value), flux.getValueByName(columnName).getRawValue()); + TEST_ASSERTM(dt.microseconds == us, String(dt.microseconds) + " vs " + String(us)); + return true; + } while(0); + return false; +} + +bool testStringValue(FluxQueryResult flux, int columnIndex, const char *columnName, const char *rawValue) { + do { + TEST_ASSERTM(flux.getValueByIndex(columnIndex).getString() == rawValue, flux.getValueByIndex(columnIndex).getString()); + TEST_ASSERTM(flux.getValueByName(columnName).getString() == rawValue, flux.getValueByName(columnName).getString()); + TEST_ASSERTM(flux.getValueByName(columnName).getRawValue() == rawValue, flux.getValueByName(columnName).getRawValue()); + return true; + } while(0); + return false; +} + +bool testStringVector(std::vector vect, const char *values[], int size) { + do { + TEST_ASSERTM(vect.size() == size, String(vect.size())); + for(int i=0;iaddTag("SSID", WiFi.SSID()); @@ -833,28 +1458,10 @@ void initInet() { Serial.println("Wifi connection failed"); while (1) delay(100); } else { - Serial.printf("Connected to: %s\n", WiFi.SSID().c_str()); - - configTime(0, 0, "pool.ntp.org", "0.cz.pool.ntp.org", "1.cz.pool.ntp.org"); - setenv("TZ", "CET-1CEST,M3.5.0,M10.5.0/3", 1); - - // Wait till time is synced - Serial.print("Waiting till time is synced "); - i = 0; - while (time(nullptr) < 1000000000ul && i < 100) { - Serial.print("."); - delay(100); - i++; - } - Serial.println(""); - printTime(); - } -} + Serial.printf("Connected to: %s (%d)\n", WiFi.SSID().c_str(), WiFi.RSSI()); -void printTime() { - time_t now = time(nullptr); - struct tm *tmstruct = localtime(&now); - Serial.printf("Local: %d.%d.%d %02d:%02d\n", tmstruct->tm_mday, (tmstruct->tm_mon) + 1, (tmstruct->tm_year) + 1900, tmstruct->tm_hour, tmstruct->tm_min); - tmstruct = gmtime(&now); - Serial.printf("GMT: %d.%d.%d %02d:%02d\n", tmstruct->tm_mday, (tmstruct->tm_mon) + 1, (tmstruct->tm_year) + 1900, tmstruct->tm_hour, tmstruct->tm_min); + timeSync("CET-1CEST,M3.5.0,M10.5.0/3", "pool.ntp.org", "0.cz.pool.ntp.org", "1.cz.pool.ntp.org"); + + deleteAll(INFLUXDB_CLIENT_TESTING_URL); + } } \ No newline at end of file