1. 程式人生 > >28 友盟大資料--flume原始碼檢視分析- ExecSource

28 友盟大資料--flume原始碼檢視分析- ExecSource

  1 //
  2 // Source code recreated from a .class file by IntelliJ IDEA
  3 // (powered by Fernflower decompiler)
  4 //
  5 
  6 package org.apache.flume.source;
  7 
  8 import com.google.common.base.Preconditions;
  9 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 10 import java.io.BufferedReader;
11 import java.io.IOException; 12 import java.io.InputStreamReader; 13 import java.nio.charset.Charset; 14 import java.util.ArrayList; 15 import java.util.List; 16 import java.util.concurrent.ExecutorService; 17 import java.util.concurrent.Executors; 18 import java.util.concurrent.Future; 19
import java.util.concurrent.ScheduledExecutorService; 20 import java.util.concurrent.ScheduledFuture; 21 import java.util.concurrent.TimeUnit; 22 import org.apache.flume.Context; 23 import org.apache.flume.Event; 24 import org.apache.flume.EventDrivenSource; 25 import org.apache.flume.SystemClock;
26 import org.apache.flume.channel.ChannelProcessor; 27 import org.apache.flume.conf.Configurable; 28 import org.apache.flume.event.EventBuilder; 29 import org.apache.flume.instrumentation.SourceCounter; 30 import org.slf4j.Logger; 31 import org.slf4j.LoggerFactory; 32 33 public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable { 34 private static final Logger logger = LoggerFactory.getLogger(ExecSource.class); 35 private String shell; 36 private String command; 37 private SourceCounter sourceCounter; 38 private ExecutorService executor; 39 private Future<?> runnerFuture; 40 private long restartThrottle; 41 private boolean restart; 42 private boolean logStderr; 43 private Integer bufferCount; 44 private long batchTimeout; 45 private ExecSource.ExecRunnable runner; 46 private Charset charset; 47 48 public ExecSource() { 49 } 50 51 public void start() { 52 logger.info("Exec source starting with command:{}", this.command); 53 this.executor = Executors.newSingleThreadExecutor(); 54 this.runner = new ExecSource.ExecRunnable(this.shell, this.command, this.getChannelProcessor(), this.sourceCounter, this.restart, this.restartThrottle, this.logStderr, this.bufferCount, this.batchTimeout, this.charset); 55 this.runnerFuture = this.executor.submit(this.runner); 56 this.sourceCounter.start(); 57 super.start(); 58 logger.debug("Exec source started"); 59 } 60 61 public void stop() { 62 logger.info("Stopping exec source with command:{}", this.command); 63 if (this.runner != null) { 64 this.runner.setRestart(false); 65 this.runner.kill(); 66 } 67 68 if (this.runnerFuture != null) { 69 logger.debug("Stopping exec runner"); 70 this.runnerFuture.cancel(true); 71 logger.debug("Exec runner stopped"); 72 } 73 74 this.executor.shutdown(); 75 76 while(!this.executor.isTerminated()) { 77 logger.debug("Waiting for exec executor service to stop"); 78 79 try { 80 this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS); 81 } catch (InterruptedException var2) { 82 logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting."); 83 Thread.currentThread().interrupt(); 84 } 85 } 86 87 this.sourceCounter.stop(); 88 super.stop(); 89 logger.debug("Exec source with command:{} stopped. Metrics:{}", this.command, this.sourceCounter); 90 } 91 92 public void configure(Context context) { 93 this.command = context.getString("command"); 94 Preconditions.checkState(this.command != null, "The parameter command must be specified"); 95 this.restartThrottle = context.getLong("restartThrottle", 10000L); 96 this.restart = context.getBoolean("restart", false); 97 this.logStderr = context.getBoolean("logStdErr", false); 98 this.bufferCount = context.getInteger("batchSize", 20); 99 this.batchTimeout = context.getLong("batchTimeout", 3000L); 100 this.charset = Charset.forName(context.getString("charset", "UTF-8")); 101 this.shell = context.getString("shell", (String)null); 102 if (this.sourceCounter == null) { 103 this.sourceCounter = new SourceCounter(this.getName()); 104 } 105 106 } 107 108 private static class StderrReader extends Thread { 109 private BufferedReader input; 110 private boolean logStderr; 111 112 protected StderrReader(BufferedReader input, boolean logStderr) { 113 this.input = input; 114 this.logStderr = logStderr; 115 } 116 117 public void run() { 118 try { 119 int i = 0; 120 String line = null; 121 122 while((line = this.input.readLine()) != null) { 123 if (this.logStderr) { 124 Logger var10000 = ExecSource.logger; 125 ++i; 126 var10000.info("StderrLogger[{}] = '{}'", i, line); 127 } 128 } 129 } catch (IOException var11) { 130 ExecSource.logger.info("StderrLogger exiting", var11); 131 } finally { 132 try { 133 if (this.input != null) { 134 this.input.close(); 135 } 136 } catch (IOException var10) { 137 ExecSource.logger.error("Failed to close stderr reader for exec source", var10); 138 } 139 140 } 141 142 } 143 } 144 145 private static class ExecRunnable implements Runnable { 146 private final String shell; 147 private final String command; 148 private final ChannelProcessor channelProcessor; 149 private final SourceCounter sourceCounter; 150 private volatile boolean restart; 151 private final long restartThrottle; 152 private final int bufferCount; 153 private long batchTimeout; 154 private final boolean logStderr; 155 private final Charset charset; 156 private Process process = null; 157 private SystemClock systemClock = new SystemClock(); 158 private Long lastPushToChannel; 159 ScheduledExecutorService timedFlushService; 160 ScheduledFuture<?> future; 161 162 public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, long batchTimeout, Charset charset) { 163 this.lastPushToChannel = this.systemClock.currentTimeMillis(); 164 this.command = command; 165 this.channelProcessor = channelProcessor; 166 this.sourceCounter = sourceCounter; 167 this.restartThrottle = restartThrottle; 168 this.bufferCount = bufferCount; 169 this.batchTimeout = batchTimeout; 170 this.restart = restart; 171 this.logStderr = logStderr; 172 this.charset = charset; 173 this.shell = shell; 174 } 175 176 public void run() { 177 do { 178 String exitCode = "unknown"; 179 BufferedReader reader = null; 180 String line = null; 181 final List<Event> eventList = new ArrayList(); 182 this.timedFlushService = Executors.newSingleThreadScheduledExecutor((new ThreadFactoryBuilder()).setNameFormat("timedFlushExecService" + Thread.currentThread().getId() + "-%d").build()); 183 184 try { 185 String[] commandArgs; 186 if (this.shell != null) { 187 commandArgs = formulateShellCommand(this.shell, this.command); 188 this.process = Runtime.getRuntime().exec(commandArgs); 189 } else { 190 commandArgs = this.command.split("\\s+"); 191 this.process = (new ProcessBuilder(commandArgs)).start(); 192 } 193 194 reader = new BufferedReader(new InputStreamReader(this.process.getInputStream(), this.charset)); 195 ExecSource.StderrReader stderrReader = new ExecSource.StderrReader(new BufferedReader(new InputStreamReader(this.process.getErrorStream(), this.charset)), this.logStderr); 196 stderrReader.setName("StderrReader-[" + this.command + "]"); 197 stderrReader.setDaemon(true); 198 stderrReader.start(); 199 this.future = this.timedFlushService.scheduleWithFixedDelay(new Runnable() { 200 public void run() { 201 try { 202 List var1 = eventList; 203 synchronized(eventList) { 204 if (!eventList.isEmpty() && ExecRunnable.this.timeout()) { 205 ExecRunnable.this.flushEventBatch(eventList); 206 } 207 } 208 } catch (Exception var4) { 209 ExecSource.logger.error("Exception occured when processing event batch", var4); 210 if (var4 instanceof InterruptedException) { 211 Thread.currentThread().interrupt(); 212 } 213 } 214 215 } 216 }, this.batchTimeout, this.batchTimeout, TimeUnit.MILLISECONDS); 217 218 while((line = reader.readLine()) != null) { 219 synchronized(eventList) { 220 this.sourceCounter.incrementEventReceivedCount(); 221 eventList.add(EventBuilder.withBody(line.getBytes(this.charset))); 222 if (eventList.size() >= this.bufferCount || this.timeout()) { 223 this.flushEventBatch(eventList); 224 } 225 } 226 } 227 228 synchronized(eventList) { 229 if (!eventList.isEmpty()) { 230 this.flushEventBatch(eventList); 231 } 232 } 233 } catch (Exception var23) { 234 ExecSource.logger.error("Failed while running command: " + this.command, var23); 235 if (var23 instanceof InterruptedException) { 236 Thread.currentThread().interrupt(); 237 } 238 } finally { 239 if (reader != null) { 240 try { 241 reader.close(); 242 } catch (IOException var19) { 243 ExecSource.logger.error("Failed to close reader for exec source", var19); 244 } 245 } 246 247 exitCode = String.valueOf(this.kill()); 248 } 249 250 if (this.restart) { 251 ExecSource.logger.info("Restarting in {}ms, exit code {}", this.restartThrottle, exitCode); 252 253 try { 254 Thread.sleep(this.restartThrottle); 255 } catch (InterruptedException var20) { 256 Thread.currentThread().interrupt(); 257 } 258 } else { 259 ExecSource.logger.info("Command [" + this.command + "] exited with " + exitCode); 260 } 261 } while(this.restart); 262 263 } 264 265 private void flushEventBatch(List<Event> eventList) { 266 this.channelProcessor.processEventBatch(eventList);//通道處理器 詳細見下面程式碼 267 this.sourceCounter.addToEventAcceptedCount((long)eventList.size()); 268 eventList.clear(); 269 this.lastPushToChannel = this.systemClock.currentTimeMillis(); 270 } 271 272 private boolean timeout() { 273 return this.systemClock.currentTimeMillis() - this.lastPushToChannel >= this.batchTimeout; 274 } 275 276 private static String[] formulateShellCommand(String shell, String command) { 277 String[] shellArgs = shell.split("\\s+"); 278 String[] result = new String[shellArgs.length + 1]; 279 System.arraycopy(shellArgs, 0, result, 0, shellArgs.length); 280 result[shellArgs.length] = command; 281 return result; 282 } 283 284 public int kill() { 285 if (this.process != null) { 286 Process var1 = this.process; 287 synchronized(this.process) { 288 this.process.destroy(); 289 290 int var10000; 291 try { 292 int exitValue = this.process.waitFor(); 293 if (this.future != null) { 294 this.future.cancel(true); 295 } 296 297 if (this.timedFlushService != null) { 298 this.timedFlushService.shutdown(); 299 300 while(!this.timedFlushService.isTerminated()) { 301 try { 302 this.timedFlushService.awaitTermination(500L, TimeUnit.MILLISECONDS); 303 } catch (InterruptedException var5) { 304 ExecSource.logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting."); 305 Thread.currentThread().interrupt(); 306 } 307 } 308 } 309 310 var10000 = exitValue; 311 } catch (InterruptedException var6) { 312 Thread.currentThread().interrupt(); 313 return -2147483648; 314 } 315 316 return var10000; 317 } 318 } else { 319 return -1073741824; 320 } 321 } 322 323 public void setRestart(boolean restart) { 324 this.restart = restart; 325 } 326 } 327 }

 

ChannelProcessor  processEventBatch()

  1  public void processEventBatch(List<Event> events) {
  2         Preconditions.checkNotNull(events, "Event list must not be null");
  3         events = this.interceptorChain.intercept(events);//攔截器鏈---攔截事件
  4         Map<Channel, List<Event>> reqChannelQueue = new LinkedHashMap();
  5         Map<Channel, List<Event>> optChannelQueue = new LinkedHashMap();
  6         Iterator i$ = events.iterator();
  7 
  8         List batch;
  9         Iterator i$;
 10         while(i$.hasNext()) {
 11             Event event = (Event)i$.next();
 12             List<Channel> reqChannels = this.selector.getRequiredChannels(event);
 13 
 14             Object eventQueue;
 15             for(Iterator i$ = reqChannels.iterator(); i$.hasNext(); ((List)eventQueue).add(event)) {
 16                 Channel ch = (Channel)i$.next();
 17                 eventQueue = (List)reqChannelQueue.get(ch);
 18                 if (eventQueue == null) {
 19                     eventQueue = new ArrayList();
 20                     reqChannelQueue.put(ch, eventQueue);
 21                 }
 22             }
 23 
 24             batch = this.selector.getOptionalChannels(event);
 25 
 26             Object eventQueue;
 27             for(i$ = batch.iterator(); i$.hasNext(); ((List)eventQueue).add(event)) {
 28                 Channel ch = (Channel)i$.next();
 29                 eventQueue = (List)optChannelQueue.get(ch);
 30                 if (eventQueue == null) {
 31                     eventQueue = new ArrayList();
 32                     optChannelQueue.put(ch, eventQueue);
 33                 }
 34             }
 35         }
 36 
 37         i$ = reqChannelQueue.keySet().iterator();
 38 
 39         Channel optChannel;
 40         Transaction tx;
 41         Event event;
 42         while(i$.hasNext()) {
 43             optChannel = (Channel)i$.next();
 44             tx = optChannel.getTransaction();
 45             Preconditions.checkNotNull(tx, "Transaction object must not be null");
 46 
 47             try {
 48                 tx.begin();
 49                 batch = (List)reqChannelQueue.get(optChannel);
 50                 i$ = batch.iterator();
 51 
 52                 while(i$.hasNext()) {
 53                     event = (Event)i$.next();
 54                     optChannel.put(event);
 55                 }
 56 
 57                 tx.commit();
 58             } catch (Throwable var23) {
 59                 tx.rollback();
 60                 if (var23 instanceof Error) {
 61                     LOG.error("Error while writing to required channel: " + optChannel, var23);
 62                     throw (Error)var23;
 63                 }
 64 
 65                 if (var23 instanceof ChannelException) {
 66                     throw (ChannelException)var23;
 67                 }
 68 
 69                 throw new ChannelException("Unable to put batch on required channel: " + optChannel, var23);
 70             } finally {
 71                 if (tx != null) {
 72                     tx.close();
 73                 }
 74 
 75             }
 76         }
 77 
 78         i$ = optChannelQueue.keySet().iterator();
 79 
 80         while(i$.hasNext()) {
 81             optChannel = (Channel)i$.next();
 82             tx = optChannel.getTransaction();
 83             Preconditions.checkNotNull(tx, "Transaction object must not be null");
 84 
 85             try {
 86                 tx.begin();
 87                 batch = (List)optChannelQueue.get(optChannel);
 88                 i$ = batch.iterator();
 89 
 90                 while(i$.hasNext()) {
 91                     event = (Event)i$.next();
 92                     optChannel.put(event);
 93                 }
 94 
 95                 tx.commit();
 96             } catch (Throwable var21) {
 97                 tx.rollback();
 98                 LOG.error("Unable to put batch on optional channel: " + optChannel, var21);
 99                 if (var21 instanceof Error) {
100                     throw (Error)var21;
101                 }
102             } finally {
103                 if (tx != null) {
104                     tx.close();
105                 }
106 
107             }
108         }
109 
110     }

參照主機名攔截器HostInterceptor ---寫限速攔截器  實現  Interceptor 

  1 //
  2 // Source code recreated from a .class file by IntelliJ IDEA
  3 // (powered by Fernflower decompiler)
  4 //
  5 
  6 package org.apache.flume.interceptor;
  7 
  8 import java.net.InetAddress;
  9 import java.net.UnknownHostException;
 10 import java.util.Iterator;
 11 import java.util.List;
 12 import java.util.Map;
 13 import org.apache.flume.Context;
 14 import org.apache.flume.Event;
 15 import org.slf4j.Logger;
 16 import org.slf4j.LoggerFactory;
 17 
 18 public class HostInterceptor implements Interceptor {
 19     private static final Logger logger = LoggerFactory.getLogger(HostInterceptor.class);
 20     private final boolean preserveExisting;
 21     private final String header;
 22     private String host;
 23 
 24     private HostInterceptor(boolean preserveExisting, boolean useIP, String header) {
 25         this.host = null;
 26         this.preserveExisting = preserveExisting;
 27         this.header = header;
 28 
 29         try {
 30             InetAddress addr = InetAddress.getLocalHost();
 31             if (useIP) {
 32                 this.host = addr.getHostAddress();
 33             } else {
 34                 this.host = addr.getCanonicalHostName();
 35             }
 36         } catch (UnknownHostException var6) {
 37             logger.warn("Could not get local host address. Exception follows.", var6);
 38         }
 39 
 40     }
 41 
 42     public void initialize() {
 43     }
 44 
 45     public Event intercept(Event event) {
 46         Map<String, String> headers = event.getHeaders();
 47         if (this.preserveExisting && headers.containsKey(this.header)) {
 48             return event;
 49         } else {
 50             if (this.host != null) {
 51                 headers.put(this.header, this.host);
 52             }
 53 
 54             return event;
 55         }
 56     }
 57 
 58     public List<Event> intercept(List<Event> events) {
 59         Iterator i$ = events.iterator();
 60 
 61         while(i$.hasNext()) {
 62             Event event = (Event)i$.next();
 63             this.intercept(event);
 64         }
 65 
 66         return events;
 67     }
 68 
 69     public void close() {
 70     }
 71 
 72     public static class Constants {
 73         public static String HOST = "host";
 74         public static String PRESERVE = "preserveExisting";
 75         public static boolean PRESERVE_DFLT = false;
 76         public static String USE_IP = "useIP";
 77         public static boolean USE_IP_DFLT = true;
 78         public static String HOST_HEADER = "hostHeader";
 79 
 80         public Constants() {
 81         }
 82     }
 83 
 84     public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder {
 85         private boolean preserveExisting;
 86         private boolean useIP;
 87         private String header;
 88 
 89         public Builder() {
 90             this.preserveExisting = HostInterceptor.Constants.PRESERVE_DFLT;
 91             this.useIP = HostInterceptor.Constants.USE_IP_DFLT;
 92             this.header = HostInterceptor.Constants.HOST;
 93         }
 94 
 95         public Interceptor build() {
 96             return new HostInterceptor(this.preserveExisting, this.useIP, this.header);
 97         }
 98 
 99         public void configure(Context context) {
100             this.preserveExisting = context.getBoolean(HostInterceptor.Constants.PRESERVE, HostInterceptor.Constants.PRESERVE_DFLT);
101             this.useIP = context.getBoolean(HostInterceptor.Constants.USE_IP, HostInterceptor.Constants.USE_IP_DFLT);
102             this.header = context.getString(HostInterceptor.Constants.HOST_HEADER, HostInterceptor.Constants.HOST);
103         }
104     }
105 }