Edge Data Streaming
- Edge Data Streaming 2.3.1
- All Products
package ...; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.json.JSONArray; import org.json.JSONObject; import com.informatica.vds.api.VDSConfiguration; import com.informatica.vds.api.VDSEventList; import com.informatica.vds.api.VDSPluginStatistics; import com.informatica.vds.api.VDSSource; public class CustomSource implements VDSSource, VDSPluginStatistics { private String txtField; private BlockingQueue queue; private Map<Short, Long> statValues = new HashMap<Short, Long>(); private long count; private long statistic1; private long statistic2; @Override public void close() throws IOException { System.out.println("Closing custom source..."); } @Override public void open(VDSConfiguration arg0) throws Exception { System.out.println("Opening custom source..."); txtField = arg0.getString("newField"); String pluginStats = arg0.getString("statistic"); JSONArray pluginStatsJsonArray = new JSONObject(pluginStats).getJSONArray("statistic"); for (int i = 0; i < pluginStatsJsonArray.length(); i++) { JSONObject stat = pluginStatsJsonArray.getJSONObject(i); short pluginStatKey = Short.parseShort(stat.getString("id")); statValues.put(new Short(pluginStatKey), (long) 0); } queue = new LinkedBlockingQueue(100); for (int i = 0; i < 100; i++) { queue.offer(txtField + i); } } @Override public void read(VDSEventList arg0) throws Exception { System.out.println("Reading source..."); Thread.sleep(10); String message = (String) queue.poll(); if (message == null) { return; } count++; arg0.addEvent(message.getBytes(), message.getBytes().length); if (count % 2 == 0) { statValues.put((short)2, ++statistic1); } if (count % 3 == 0) { statValues.put((short)3, ++statistic2); } System.out.println("Data " + message.getBytes() + " " + message.getBytes().length); } @Override public long[] getStatistics(short[] arg0) { long[] statistics = new long[arg0.length]; for (int i = 0; i < arg0.length; i++) { statistics[i] = statValues.get(arg0[i]); } return statistics; } @Override public void setRetryPolicyHandler(IPluginRetryPolicy arg0) { // TODO Auto-generated method stub } }