1. 程式人生 > 實用技巧 >【Flink系列三】Influxdb Java客戶端訪問指標資料

【Flink系列三】Influxdb Java客戶端訪問指標資料

Influxdb Java客戶端

Influxdb 的Docker版本目前最高是1.8.3. 官方最高版本是2.0.

Note: We recommend using the new client libraries on this page to leverage the new read (via Flux) and write APIs and prepare for conversion to InfluxDB 2.0 and InfluxDB Cloud 2.0. For more information, see InfluxDB 2.0 API compatibility endpoints. Client libraries for InfluxDB 1.7 and earlier may continue to work, but are not maintained by InfluxData.

官方推薦使用本頁新讀(Flux)和寫API來訪問Influx 2.0

檢視官方文件發現,客戶端有:
influx-client-java

與時俱進

<dependency>
    <groupId>com.influxdb</groupId>
    <artifactId>influxdb-client-java</artifactId>
    <version>1.12.0</version>
</dependency>

建立連線

 InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token);

        //
        // Create bucket "iot_bucket" with data retention set to 3,600 seconds
        //
        BucketRetentionRules retention = new BucketRetentionRules();
        retention.setEverySeconds(3600);

        Bucket bucket = influxDBClient.getBucketsApi().createBucket("iot-bucket", retention, "12bdc4164c2e8141");

1.8版本如何訪問

Query data in InfluxDB 1.8.0+ using the InfluxDB 2.0 API and Flux (endpoint should be enabled by flux-enabled option)

雖然InfluxDB 2.0 API相容 influxdb 1.8.0,但是應該開啟 flux

[http]
  enabled = true
  bind-address = ":8086"
  auth-enabled = true
  log-enabled = true
  suppress-write-log = false
  write-tracing = false
  flux-enabled = true //重點在這

Flux 是什麼,FluxQL是什麼?

Flux是InfluxQL和其他類似SQL的查詢語言的替代品,用於查詢和分析資料。Flux使用功能語言模式,使其功能強大,靈活,並能夠克服InfluxQL的許多限制。

看樣子官方是更加推薦使用 Flux語法

Flux語法

開啟Flux

例子:(bucket是InfluxDb2.0提出的概念,Influxdb1.x沒有)

data = from(bucket: "db/rp")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "example-measurement" and
    r._field == "example-field"
  )

Flux1.8和2.0

Key-Concepts
從這篇文章可以看到 2.0 是把Database和RententionPolicies合併為Bucket。因此Flink指標收集的位置是 flink/flink。

另外Organization是什麼?

InfluxDB組織是一組使用者的工作空間。所有儀表板,任務,儲存桶和使用者均屬於組織。有關組織的更多資訊,請參閱管理組織。
InfluxDb 1.8沒有Organization的概念。通過下方的例子,可以看到1.8版本的org=-。

訪問Influxdb 1.8

public class InfluxDB18Example {

    public static void main(final String[] args) {

        String database = "telegraf";
        String retentionPolicy = "autogen";

        InfluxDBClient client = InfluxDBClientFactory.createV1("http://localhost:8086",
                "username",
                "password".toCharArray(),
                database,
                retentionPolicy);

        System.out.println("*** Write Points ***");

        try (WriteApi writeApi = client.getWriteApi()) {

            Point point = Point.measurement("mem")
                    .addTag("host", "host1")
                    .addField("used_percent", 29.43234543);

            System.out.println(point.toLineProtocol());

            writeApi.writePoint(point);
        }

        System.out.println("*** Query Points ***");
        String query = String.format("from(bucket: \"%s/%s\") |> range(start: -1h)", database, retentionPolicy);

        List<FluxTable> tables = client.getQueryApi().query(query);
        tables.get(0).getRecords()
                .forEach(record -> System.out.println(String.format("%s %s: %s %s",
                        record.getTime(), record.getMeasurement(), record.getField(), record.getValue())));

        client.close();
    }
}

訪問Flink Checkpoint的路徑指標資料

  public static List<JobLastCheckpointExternalPath> getCheckPoints(String jobId) {
    InfluxDbConfig config = new InfluxDbConfig();
    config.setHost("http://10.11.159.156:8099");
    config.setDatabase("flink");
    config.setPassword("flink");
    config.setUsername("flink");
    config.setRetentionPolicy("one_hour");

    String database = config.getDatabase();
    String retentionPolicy = config.getRetentionPolicy();

    InfluxDBClient client = InfluxDBClientFactory.createV1(config.getHost(),
        config.getUsername(),
        config.getPassword().toCharArray(),
        database,
        retentionPolicy);
    client.setLogLevel(LogLevel.BASIC);

    QueryApi queryApi = client.getQueryApi();

    String query = String.format("from(bucket: \"%s/%s\") |> range(start: -30m) |> filter(fn: (r) =>\n" +
        "    r._measurement == \"jobmanager_job_lastCheckpointExternalPath\" and\n" +
        "    r.job_id == \"%s\"\n" +
        "  ) |> sort(columns: [\"_time\"], desc: true) |> limit(n:100)", database, retentionPolicy, jobId);
    //
    // Query data
    //

    List<JobLastCheckpointExternalPath> tables = queryApi.query(query, JobLastCheckpointExternalPath.class);

    client.close();
    return tables;
  }

  @Measurement(name = "jobmanager_job_lastCheckpointExternalPath")
  public static
  class JobLastCheckpointExternalPath {
    @Column(timestamp = true)
    Instant time;

    @Column(name = "job_name")
    String jobName;

    @Column(name = "job_id")
    String jobId;

    @Column(name = "value")
    String value;

    @Override
    public String toString() {
      return "JobLastCheckpointExternalPath{" +
          "time=" + time +
          ", jobName='" + jobName + '\'' +
          ", jobId='" + jobId + '\'' +
          ", value='" + value + '\'' +
          '}';
    }
  }