数据平台代表了一个第三方系统,DataHub从数据平台获取数据元,都与一个平台相关联,例如MySQL、Oracle、Hive或HDFS等。
在有些情况下,要导入的数据元不属于DataHub支持的数据平台,怎么办呢?
DataHub提供了Rest API的方式导入,目前支持Python和Java的SDK。下面以Java SDK为例:
1、引入sdk
pom.xml 添加依赖:
<!-- https://mvnrepository.com/artifact/io.acryl/datahub-client -->
<dependency>
<groupId>io.acryl</groupId>
<artifactId>datahub-client</artifactId>
<!-- replace with the latest version number -->
<version>0.0.1</version>
</dependency>
2、添加数据平台
添加一个数据平台可以指定名称和logo,添加成功并有数据元导入后,登录datahub首页可以看到。
由于此接口比较简单,因此采用了直接调用http接口的方式:
# 添加 SQLFlow 数据平台
curl 'http://localhost:8080/entities?action=ingest' -X POST --data '{
"entity":{
"value":{
"com.linkedin.metadata.snapshot.DataPlatformSnapshot":{
"aspects":[
{
"com.linkedin.dataplatform.DataPlatformInfo":{
"datasetNameDelimiter": "/",
"name": "SQLFlow",
"type": "OTHERS",
"logoUrl": "http://101.43.8.206/lineage/images/logo.png"
}
}
],
"urn":"urn:li:dataPlatform:SQLFlow"
}
}
}
}'
3、添加数据集
3.1、配置接口连接参数
import datahub.client.rest.RestEmitter;
//...
RestEmitter emitter = RestEmitter.create(b -> b
.server("http://localhost:8080")
//Auth token for Managed DataHub .token(AUTH_TOKEN_IF_NEEDED)
//Override default timeout of 10 seconds .timeoutSec(OVERRIDE_DEFAULT_TIMEOUT_IN_SECONDS)
//Add additional headers .extraHeaders(Collections.singletonMap("Session-token", "MY_SESSION"))
// Customize HttpClient's connection ttl .customizeHttpAsyncClient(c -> c.setConnectionTimeToLive(30, TimeUnit.SECONDS))
);
3.2、定义一个数据集并上传
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.events.metadata.ChangeType;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.client.rest.RestEmitter;
import datahub.client.Callback;
// ... followed by
// Creates the emitter with the default coordinates and settings
RestEmitter emitter = RestEmitter.createWithDefaults();
MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.user-table,PROD)")
.upsert()
.aspect(new DatasetProperties().setDescription("This is the canonical User profile dataset"))//数据元 这只是一个简单的只带描述的示例,复杂的数据元格式见3.3
.build();
// Non-blocking using callback
emitter.emit(mcpw, new Callback() {
@Override
public void onCompletion(MetadataWriteResponse response) {
if (response.isSuccess()) {
System.out.println(String.format("Successfully emitted metadata event for %s", mcpw.getEntityUrn()));
} else {
// Get the underlying http response
HttpResponse httpResponse = (HttpResponse) response.getUnderlyingResponse();
System.out.println(String.format("Failed to emit metadata event for %s, aspect: %s with status code: %d",
mcpw.getEntityUrn(), mcpw.getAspectName(), httpResponse.getStatusLine().getStatusCode()));
// Print the server side exception if it was captured
if (response.getServerException() != null) {
System.out.println(String.format("Server side exception was %s", response.getServerException()));
}
}
}
@Override
public void onFailure(Throwable exception) {
System.out.println(
String.format("Failed to emit metadata event for %s, aspect: %s due to %s", mcpw.getEntityUrn(),
mcpw.getAspectName(), exception.getMessage()));
}
});
3.3、定义数据元
注意:不能有中文字符,如果有会报错。
SchemaMetadata metadata = new SchemaMetadata();
DatasetUrn.createFromString("urn:li:dataPlatform:bigquery");
String schemaname = list.get(0).get("schemaname");
String tablename = list.get(0).get("tablename");
metadata.setSchemaName(schemaname);//platformSchema
SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
OtherSchema schema = new OtherSchema();
schema.setRawSchema("SQLFlow");
platformSchema.setOtherSchema(schema);
metadata.setPlatformSchema(platformSchema);
DataPlatformUrn urn = new DataPlatformUrn("SQLFlow");
metadata.setPlatform(urn);
metadata.setVersion(0);
metadata.setHash(hashKeyForDisk(key));
SchemaFieldArray fieldArray = new SchemaFieldArray();
metadata.setFields(fieldArray);
for(Map<String, String > map: list){
SchemaField field = new SchemaField();
field.setDescription("");
field.setFieldPath(map.get("columnname"));
field.setNativeDataType(map.get("external_type"));
SchemaFieldDataType dataType = getDataType(field.getNativeDataType());
field.setType(dataType);
fieldArray.add(field);
}
4、删除数据集
datahub delete --env PROD --entity_type dataset --platform SQLFlow