Data Integration Connector Toolkit
- Data Integration Connector Toolkit
- All Products
ASOOperation targetASO = (ASOOperation)currentObj; PushdownRuntimeMetadata pushdownMetadata = targetASO.getASOPushdownRuntimeMetadata();
List<ASOOperation> sourceAso = pushdownMetadata.getAsoOperationList(); //Get a list of native sources and check for a source with extensionID other than MySQL for(ASOOperation aso:sourceAso){ String extentionID = ((SD_ASOOperation)aso)._get_imfObject().getBaseASO().getExtensionId(); if(!extentionID.equalsIgnoreCase("com.infa.adapter.mysql_cloud")){ Utils.createPushdownValidationError(currentObj,"Ecosystem Pushdown is not supported for Combination of connectors MySQL and "+extentionID); return false; } }
//Get the NMOTypeContext object to create a catalog & add records NMOTypeContext SourceNMO= new NMOTypeContext(pushdownMetadata.getAsoOperationList().get(0).getASO().getNmoType(),"Provide Target AdapterID); //Get the Catalog to add the Source/Lookup records Catalog catalog = SDKPushdownUtils.getCatalog(SourceNMO);
List<OperationBase> mergeOperation = new ArrayList<>(); List<OperationBase> baseOperations = pushdownMetadata.getAsoOperationList().get(0).getReadProjection().getBaseOperations(OperationTypeEnum.JOIN); if (baseOperations != null && baseOperations.size() > 0) { List<OperationBase> ops = pushdownMetadata.getAsoOperationList().get(0).getReadProjection().getBaseOperations(OperationTypeEnum.CONVERSION); for (OperationBase op : ops) { mergeOperation.add(op); } } else{ List<OperationBase> opsNJ = pushdownMetadata.getAsoOperationList().get(0).getReadProjection().getBaseOperations(OperationTypeEnum.CONVERSION); for (OperationBase op : opsNJ) { mergeOperation.add(op); } }
//List of records for source List<Record> listOfRecords=new ArrayList<Record>(); for(OperationBase operationBase : mergeOperation) { Factory sdkFactory = catalog.getFactory(); FlatRecord flatRecord = sdkFactory.newFlatRecord(catalog); Operation operation = (Operation) operationBase; flatRecord.setName(tableName); flatRecord.setNativeName(tableName); List<FieldBase> fieldList = (((SD_ConversionOperation) operation).getOperationFields()); //Populate the Flat record fields created for this source for(FieldBase fld:fieldList) { Field field = sdkFactory.newField(catalog); if(fld instanceof SAD_DerivedField) { SD_DerivedField dField=(SD_DerivedField)fld; //Field nativeFld = (Field)fld; field.setName(dField.getName()); field.setNativeName(dField.getName()); field.setDataType(dField.getTypeName()); field.setPrecision(dField.getPrecision()); field.setScale(dField.getScale()); flatRecord.addField(field); } } catalog.addRootRecord(flatRecord); //Add it to the list of source records listOfRecords.add(flatRecord) }
List<OperationBase> lookupOpBase = new ArrayList<OperationBase>(); List<ASOOperation> asoOps = pushdownMetadata.getLookupASOOperationList(); if (asoOps != null && !asoOps.isEmpty()) { for (ASOOperation eachASOOp : asoOps) { List<OperationBase> ops = eachASOOp.getReadProjection().getBaseOperations(OperationTypeEnum.CONVERSION); lookupOpBase.add(ops.get(0)); } }
List<Record> lookupRecord=new ArrayList<Record>(); for(OperationBase operationBase : lookupOpBase) { Factory sdkFactory = catalog.getFactory(); FlatRecord flatRecord = sdkFactory.newFlatRecord(catalog); Operation operation = (Operation) operationBase; flatRecord.setName(tableName); flatRecord.setNativeName(tableName); List<FieldBase> fieldList = (((SD_ConversionOperation) operation).getOperationFields()); //Populate the Flat record fields created for this source for(FieldBase fld:fieldList) { Field field = sdkFactory.newField(catalog); if(fld instanceof SAD_DerivedField) { SD_DerivedField dField=(SD_DerivedField)fld; //Field nativeFld = (Field)fld; field.setName(dField.getName()); field.setNativeName(dField.getName()); field.setDataType(dField.getTypeName()); field.setPrecision(dField.getPrecision()); field.setScale(dField.getScale()); flatRecord.addField(field); } } catalog.addRootRecord(flatRecord); //Add it to the list of source records lookupRecord.add(flatRecord) }
//MySQL_Cloud is an example of the engine object to be passed as an argument for creating PushdownUtilsContext MySQL_CloudEngine mySQLEng = new MySQL_CloudEngine(new MySQL_CloudASOOperationObjMgr()); PushdownUtilsContext pdctx=null; //Create a new PushdownUtilsContext object based on whether a lookup source is used the mapping if(lookupOpBase!=null&&!lookupRecord.isEmpty()) { pdctx= new PushdownUtilsContext(mySQLEng, pushdownMetadata.getAsoOperationList().get(0).getReadProjection(), ((ASOOperation)currentObj).getWriteProjection(), ctx, mergeOperation, listOfRecords, lookupOpBase, lookupRecord); } else { pdctx= new PushdownUtilsContext(mySQLEng, pushdownMetadata.getAsoOperationList().get(0).getReadProjection(), ((ASOOperation)currentObj).getWriteProjection(), ctx, mergeOperation, listOfRecords); }
try { PushdownSql pdoSQL=SDKPushdownUtils.getPushdownSDKSQL(pdctx); String finalOptmizedQuery=(String)pdoSQL.getOptimizedSql().get(0); pushdownMetadata.setSql(finalOptmizedQuery); if(finalOptmizedQuery.isEmpty()||finalOptmizedQuery==null) { Utils.createPushdownValidationError(currentObj,"Pushdown SQL is empty"); return false; } } } catch (ExpressionParseException e) {` Utils.createPushdownValidationError(currentObj,e.getMessage()); return false; } Utils.logMessage(currentObj,"Full Pushdown Query is : "+pushdownMetadata.getSql()); Utils.logMessage(currentObj,"Pushdown Enabled Successfully"); return true;
for (int i = 0; i < this.createStagingTableQuery.size(); i++) { pushdownMetadata.addCustomMetadata(pushdownMetadata.getCustomMetadataSize(), (String)this.createStagingTableQuery.get(i)); } /**Populate the query to create staging tables in custom metadata of PushdownRuntimeMetadataObject * The custom metadata can be accessed at runtime to execute these queries */ pushdownMetadata.addCustomMetadata(pushdownMetadata.getCustomMetadataSize(), "Partition"); for (int j = 0; j < this.dropStagingTableQuery.size(); j++) { pushdownMetadata.addCustomMetadata(pushdownMetadata.getCustomMetadataSize(), (String)this.dropStagingTableQuery.get(j)); }
private void getSourceConnectionAndReadAttrs(PushdownRuntimeMetadata pushdownMetadata, MetadataObject currentObj) { List<OperationBase> opsNativeSource=pushdownMetadata.getAsoOperationList().get(0).getReadProjection().getBaseOperations(OperationTypeEnum.NATIVE_SOURCE); for (OperationBase op : opsNativeSource) { Map<String, Object> srcConn=SDKPushdownUtils.getNativeSourceConnection(pushdownMetadata.getAsoOperationList().get(0).getReadProjection(),(NativeSource) op); String nativeSourceName=((NativeSource) op).getNativeRecord().getNativeName(); String nativeSourceCatalog=(String) srcConn.get("catalog"); String nativeSourceHost=(String) srcConn.get("host"); Utils.logMessage(currentObj,"Connection Attributes for Source "+nativeSourceName); Utils.logMessage(currentObj,"Catalog: "+nativeSourceCatalog+" Host:"+nativeSourceHost); Map<String, Object> nativeSourceReadAttrs=SDKPushdownUtils.getNativeSourceReadAttributes(pushdownMetadata.getAsoOperationList().get(0).getReadProjection(), (NativeSource) op); Utils.logMessage(currentObj,"Read Runtime Attributes for Source "+nativeSourceName); String nativeSourcePreSQL=(String) nativeSourceReadAttrs.get("preSQL"); String nativeSourcePostSQL=(String) nativeSourceReadAttrs.get("postSQL"); int nativeSourceRowOffSet= (int) nativeSourceReadAttrs.get("rowOffSet"); int nativeSourceRowLimit= (int) nativeSourceReadAttrs.get("rowLimit"); long nativeSourceThresholdLimit=(long) nativeSourceReadAttrs.get("thresholdLimit"); Utils.logMessage(currentObj,"PreSQL: "+nativeSourcePreSQL); Utils.logMessage(currentObj,"PostSQL: "+nativeSourcePostSQL); Utils.logMessage(currentObj,"RowOffSet: "+nativeSourceRowOffSet); Utils.logMessage(currentObj,"RowLimit: "+nativeSourceRowLimit); Utils.logMessage(currentObj,"RowLimit: "+nativeSourceThresholdLimit); } }
//Pushdown query execution for MySQL_Cloud is given below PushdownRuntimeMetadata pushdownMeta = dsoHandle.getAdapterDataSourceOperation().getASOPushdownRuntimeMetadata(); if (pushdownMeta != null) { int j=0; for(int i=0;i<pushdownMeta.getCustomMetadataSize();i++){ String createStagingTableQuery = pushdownMeta.getCustomMetadata(i); if(createStagingTableQuery.equalsIgnoreCase("partition")){ j=i; break; } writeLog(EMessageLevel.MSG_INFO, "Create Staging Table for PDO with Query:" + createStagingTableQuery); tstmt.executeUpdate(createStagingTableQuery); } writeLog(EMessageLevel.MSG_INFO, "Full PDO Query is:" + pushdownMeta.getSql()); rowInserted=tstmt.executeUpdate(pushdownMeta.getSql()); for(int k=j+1;k<pushdownMeta.getCustomMetadataSize();k++){ writeLog(EMessageLevel.MSG_INFO, "Dropping Staging Table for PDO with Query:" + pushdownMeta.getCustomMetadata(k)); tstmt.executeUpdate(pushdownMeta.getCustomMetadata(k)); } if(rowInserted > 0){ //Set the RowStats for executed PDO Query RowsStatInfo rowstatInfoPDO = runtimeConfigHandleInit.getRowsStatInfo(EIUDIndicator.INSERT); rowstatInfoPDO.incrementRequested(rowInserted); rowstatInfoPDO.incrementAffected(rowInserted); rowstatInfoPDO.incrementApplied(rowInserted); } } // closing statements and connection to data source } catch (Exception e) { writeLog(EMessageLevel.MSG_ERROR, e1.getMessage()); return EStatus.FAILURE; }
public MySQL_CloudTypeHandler getTypeHandler() { return new MySQL_CloudTypeHandler(); } /** * connector specific QueryVisitor instance should be returned here by connector specific implementation of MD_ASOOperation **/ @Override public XVisitor<SdkXNode, SDKExprNode> getSDKXVisitor(XVisitorContext ctx) { return new MySQL_CloudXVisitor<SdkXNode>(ctx.getLog(), ctx.getScope()); } @Override public SDKPostProcessor getQueryPostProcessor(PostProcessorContext ctx) { return new MySQL_CloudPostProc(ctx); } /** * Get connector specific Engine (used by CDI-e pushdown framework currently) * @return connector specific engine */ public SDKEngine getAdapterEngine(SDKEngineContext ctx) { return new MySQL_CloudEngine(); }