DataHub中导入csv文件中的元数据

数据平台代表了一个第三方系统,DataHub从数据平台获取数据元,都与一个平台相关联,例如MySQL、Oracle、Hive或HDFS等。

在有些情况下,要导入的数据元不属于DataHub支持的数据平台,怎么办呢?

DataHub提供了Rest API的方式导入,目前支持Python和Java的SDK。下面以Java SDK为例:

1、引入sdk

pom.xml 添加依赖:

<!-- https://mvnrepository.com/artifact/io.acryl/datahub-client -->
<dependency>
    <groupId>io.acryl</groupId>
    <artifactId>datahub-client</artifactId>
    <!-- replace with the latest version number -->
    <version>0.0.1</version>
</dependency>

2、添加数据平台

添加一个数据平台可以指定名称和logo,添加成功并有数据元导入后,登录datahub首页可以看到。

由于此接口比较简单,因此采用了直接调用http接口的方式:

# 添加 SQLFlow 数据平台
curl 'http://localhost:8080/entities?action=ingest' -X POST --data '{
   "entity":{
      "value":{
         "com.linkedin.metadata.snapshot.DataPlatformSnapshot":{
            "aspects":[
               {
                  "com.linkedin.dataplatform.DataPlatformInfo":{
                      "datasetNameDelimiter": "/",
                      "name": "SQLFlow",
                      "type": "OTHERS",
                      "logoUrl": "http://101.43.8.206/lineage/images/logo.png"
                  }
               }
            ],
            "urn":"urn:li:dataPlatform:SQLFlow"
         }
      }
   }
}'

3、添加数据集

3.1、配置接口连接参数

import datahub.client.rest.RestEmitter;
//...
RestEmitter emitter = RestEmitter.create(b -> b
                                              .server("http://localhost:8080")
//Auth token for Managed DataHub              .token(AUTH_TOKEN_IF_NEEDED)
//Override default timeout of 10 seconds      .timeoutSec(OVERRIDE_DEFAULT_TIMEOUT_IN_SECONDS)
//Add additional headers                      .extraHeaders(Collections.singletonMap("Session-token", "MY_SESSION"))
// Customize HttpClient's connection ttl      .customizeHttpAsyncClient(c -> c.setConnectionTimeToLive(30, TimeUnit.SECONDS))
                                    );

3.2、定义一个数据集并上传

import com.linkedin.dataset.DatasetProperties;
import com.linkedin.events.metadata.ChangeType;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.client.rest.RestEmitter;
import datahub.client.Callback;
// ... followed by

// Creates the emitter with the default coordinates and settings
RestEmitter emitter = RestEmitter.createWithDefaults(); 

MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper.builder()
        .entityType("dataset")
        .entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.user-table,PROD)")
        .upsert()
        .aspect(new DatasetProperties().setDescription("This is the canonical User profile dataset"))//数据元 这只是一个简单的只带描述的示例,复杂的数据元格式见3.3
        .build();


// Non-blocking using callback
emitter.emit(mcpw, new Callback() {
      @Override
      public void onCompletion(MetadataWriteResponse response) {
        if (response.isSuccess()) {
          System.out.println(String.format("Successfully emitted metadata event for %s", mcpw.getEntityUrn()));
        } else {
          // Get the underlying http response
          HttpResponse httpResponse = (HttpResponse) response.getUnderlyingResponse();
          System.out.println(String.format("Failed to emit metadata event for %s, aspect: %s with status code: %d",
              mcpw.getEntityUrn(), mcpw.getAspectName(), httpResponse.getStatusLine().getStatusCode()));
          // Print the server side exception if it was captured
          if (response.getServerException() != null) {
            System.out.println(String.format("Server side exception was %s", response.getServerException()));
          }
        }
      }

      @Override
      public void onFailure(Throwable exception) {
        System.out.println(
            String.format("Failed to emit metadata event for %s, aspect: %s due to %s", mcpw.getEntityUrn(),
                mcpw.getAspectName(), exception.getMessage()));
      }
    });

3.3、定义数据元

注意:不能有中文字符,如果有会报错。

SchemaMetadata metadata = new SchemaMetadata();
DatasetUrn.createFromString("urn:li:dataPlatform:bigquery");
String schemaname = list.get(0).get("schemaname");
String tablename = list.get(0).get("tablename");
metadata.setSchemaName(schemaname);//platformSchema
SchemaMetadata.PlatformSchema  platformSchema = new       SchemaMetadata.PlatformSchema();
OtherSchema schema = new OtherSchema();
schema.setRawSchema("SQLFlow");
platformSchema.setOtherSchema(schema);
metadata.setPlatformSchema(platformSchema);
DataPlatformUrn urn = new DataPlatformUrn("SQLFlow");
metadata.setPlatform(urn);
metadata.setVersion(0);
metadata.setHash(hashKeyForDisk(key));
SchemaFieldArray fieldArray = new SchemaFieldArray();
metadata.setFields(fieldArray);
for(Map<String, String > map: list){
	SchemaField field = new SchemaField();
	field.setDescription("");
	field.setFieldPath(map.get("columnname"));
	field.setNativeDataType(map.get("external_type"));
	SchemaFieldDataType dataType =  getDataType(field.getNativeDataType());
	field.setType(dataType);
	fieldArray.add(field);
}

4、删除数据集

datahub delete --env PROD --entity_type dataset --platform SQLFlow