geth的启动之整体及p2p服务的启动

1.入口

geth命令的实现在文件
go-ethereum/cmd/geth/main.go中
调用关系

main=>app.run()

其中app在main.go的全局变量中初始化

//go-ethereum/cmd/geth/main.go
app = utils.NewApp(gitCommit, "the go-ethereum command line interface")

//go-ethereum/cmd/utils/flags.go
func NewApp(gitCommit, usage string) *cli.App {
    app := cli.NewApp()
    app.Name = filepath.Base(os.Args[0])
    app.Author = ""
    //app.Authors = nil
    app.Email = ""
    app.Version = params.Version
    if len(gitCommit) >= 8 {
        app.Version += "-" + gitCommit[:8]
    }
    app.Usage = usage
    return app
}

cli的实现采用的是
https://godoc.org/gopkg.in/urfave/cli.v1该库的用法待学习

2.init()


func init() {
    // Initialize the CLI app and start Geth
    app.Action = geth
    app.HideVersion = true // we have a command to print the version
    app.Copyright = "Copyright 2013-2017 The go-ethereum Authors"
    app.Commands = []cli.Command{
        // See chaincmd.go:
        initCommand,
        importCommand,
        exportCommand,
        copydbCommand,
        removedbCommand,
        dumpCommand,
        // See monitorcmd.go:
        monitorCommand,
        // See accountcmd.go:
        accountCommand,
        walletCommand,
        // See consolecmd.go:
        consoleCommand,
        attachCommand,
        javascriptCommand,
        // See misccmd.go:
        makecacheCommand,
        makedagCommand,
        versionCommand,
        bugCommand,
        licenseCommand,
        // See config.go
        dumpConfigCommand,
    }
    sort.Sort(cli.CommandsByName(app.Commands))

    app.Flags = append(app.Flags, nodeFlags...)
    app.Flags = append(app.Flags, rpcFlags...)
    app.Flags = append(app.Flags, consoleFlags...)
    app.Flags = append(app.Flags, debug.Flags...)
    app.Flags = append(app.Flags, whisperFlags...)

    app.Before = func(ctx *cli.Context) error {
        runtime.GOMAXPROCS(runtime.NumCPU())
        if err := debug.Setup(ctx); err != nil {
            return err
        }
        // Start system runtime metrics collection
        go metrics.CollectProcessMetrics(3 * time.Second)

        utils.SetupNetwork(ctx)
        return nil
    }

    app.After = func(ctx *cli.Context) error {
        debug.Exit()
        console.Stdin.Close() // Resets terminal mode.
        return nil
    }
}

3.geth(ctx *cli.Context)

// geth is the main entry point into the system if no special subcommand is ran.
// It creates a default node based on the command line arguments and runs it in
// blocking mode, waiting for it to be shut down.
func geth(ctx *cli.Context) error {
    node := makeFullNode(ctx)
    startNode(ctx, node)
    node.Wait()
    return nil
}
  • makeFullNode 生成(node/node.go) node节点,并注册服务,见4-6
  • startNode(node/node.go所有服务的启动),启动各种服务,如p2p,rpc,http,数据同步等等见7

4.makeFullNode(ctx *cli.Context) *node.Node

//cmd/geth/config.go:156
func makeFullNode(ctx *cli.Context) *node.Node {
   stack, cfg := makeConfigNode(ctx)
      //注册服务
   utils.RegisterEthService(stack, &cfg.Eth)

   if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
       utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
   }
   // Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
   shhEnabled := enableWhisper(ctx)
   shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
   if shhEnabled || shhAutoEnabled {
       if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
           cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
       }
       if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
           cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
       }
       utils.RegisterShhService(stack, &cfg.Shh)
   }

   // Add the Ethereum Stats daemon if requested.
   if cfg.Ethstats.URL != "" {
       utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
   }
   return stack
}

5.RegisterEthService(stack *node.Node, cfg *eth.Config)

./cmd/utils/flags.go:1132
// RegisterEthService adds an Ethereum client to the stack.
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
    var err error
    if cfg.SyncMode == downloader.LightSync {
        err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
            return les.New(ctx, cfg)
        })
    } else {
        err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
            fullNode, err := eth.New(ctx, cfg)
            if fullNode != nil && cfg.LightServ > 0 {
                ls, _ := les.NewLesServer(fullNode, cfg)
                fullNode.AddLesServer(ls)
            }
            return fullNode, err
        })
    }
    if err != nil {
        Fatalf("Failed to register the Ethereum service: %v", err)
    }
}

6. eth.new

//eth/backend.go:104
// New creates a new Ethereum object (including the
// initialisation of the common Ethereum object)
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
    if config.SyncMode == downloader.LightSync {
        return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
    }
    if !config.SyncMode.IsValid() {
        return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
    }
    chainDb, err := CreateDB(ctx, config, "chaindata")
    if err != nil {
        return nil, err
    }
    stopDbUpgrade := upgradeDeduplicateData(chainDb)
    chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
    if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
        return nil, genesisErr
    }
    log.Info("Initialised chain configuration", "config", chainConfig)

    eth := &Ethereum{
        config:         config,
        chainDb:        chainDb,
        chainConfig:    chainConfig,
        eventMux:       ctx.EventMux,
        accountManager: ctx.AccountManager,
        engine:         CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),
        shutdownChan:   make(chan bool),
        stopDbUpgrade:  stopDbUpgrade,
        networkId:      config.NetworkId,
        gasPrice:       config.GasPrice,
        etherbase:      config.Etherbase,
        bloomRequests:  make(chan chan *bloombits.Retrieval),
        bloomIndexer:   NewBloomIndexer(chainDb, params.BloomBitsBlocks),
    }

    log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)

    if !config.SkipBcVersionCheck {
        bcVersion := core.GetBlockChainVersion(chainDb)
        if bcVersion != core.BlockChainVersion && bcVersion != 0 {
            return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion)
        }
        core.WriteBlockChainVersion(chainDb, core.BlockChainVersion)
    }
    var (
        vmConfig    = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
        cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
    )
    eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
    if err != nil {
        return nil, err
    }
    // Rewind the chain in case of an incompatible config upgrade.
    if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
        log.Warn("Rewinding chain to upgrade configuration", "err", compat)
        eth.blockchain.SetHead(compat.RewindTo)
        core.WriteChainConfig(chainDb, genesisHash, chainConfig)
    }
    eth.bloomIndexer.Start(eth.blockchain)

    if config.TxPool.Journal != "" {
        config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
    }
    eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)

    if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
        return nil, err
    }
    eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
    eth.miner.SetExtra(makeExtraData(config.ExtraData))

    eth.ApiBackend = &EthApiBackend{eth, nil}
    gpoParams := config.GPO
    if gpoParams.Default == nil {
        gpoParams.Default = config.GasPrice
    }
    eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams)

    return eth, nil
}

7.node及startnode

node中包含了rpc,ipc,http,ws,p2p等各种服务及在5中所注册的服务

//node/node.go
// Node is a container on which services can be registered.
type Node struct {
    eventmux *event.TypeMux // Event multiplexer used between the services of a stack
    config   *Config
    accman   *accounts.Manager

    ephemeralKeystore string         // if non-empty, the key directory that will be removed by Stop
    instanceDirLock   flock.Releaser // prevents concurrent use of instance directory

    serverConfig p2p.Config
    server       *p2p.Server // Currently running P2P networking layer
        //注册的服务,注册在func (n *Node) Register(constructor ServiceConstructor) error 中完成
    serviceFuncs []ServiceConstructor     // Service constructors (in dependency order)
    services     map[reflect.Type]Service // Currently running services

    rpcAPIs       []rpc.API   // List of APIs currently provided by the node
    inprocHandler *rpc.Server // In-process RPC request handler to process the API requests

    ipcEndpoint string       // IPC endpoint to listen at (empty = IPC disabled)
    ipcListener net.Listener // IPC RPC listener socket to serve API requests
    ipcHandler  *rpc.Server  // IPC RPC request handler to process the API requests

    httpEndpoint  string       // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled)
    httpWhitelist []string     // HTTP RPC modules to allow through this endpoint
    httpListener  net.Listener // HTTP RPC listener socket to server API requests
    httpHandler   *rpc.Server  // HTTP RPC request handler to process the API requests

    wsEndpoint string       // Websocket endpoint (interface + port) to listen at (empty = websocket disabled)
    wsListener net.Listener // Websocket RPC listener socket to server API requests
    wsHandler  *rpc.Server  // Websocket RPC request handler to process the API requests

    stop chan struct{} // Channel to wait for termination notifications
    lock sync.RWMutex

    log log.Logger
}

startNode如下

// Start create a live P2P node and starts running it.
func (n *Node) Start() error {
    n.lock.Lock()
    defer n.lock.Unlock()

    // Short circuit if the node's already running
    if n.server != nil {
        return ErrNodeRunning
    }
    if err := n.openDataDir(); err != nil {
        return err
    }

    // Initialize the p2p server. This creates the node key and
    // discovery databases.
    n.serverConfig = n.config.P2P
    n.serverConfig.PrivateKey = n.config.NodeKey()
    n.serverConfig.Name = n.config.NodeName()
    n.serverConfig.Logger = n.log
    if n.serverConfig.StaticNodes == nil {
        n.serverConfig.StaticNodes = n.config.StaticNodes()
    }
    if n.serverConfig.TrustedNodes == nil {
        n.serverConfig.TrustedNodes = n.config.TrustedNodes()
    }
    if n.serverConfig.NodeDatabase == "" {
        n.serverConfig.NodeDatabase = n.config.NodeDB()
    }
    running := &p2p.Server{Config: n.serverConfig}
    n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)

    // Otherwise copy and specialize the P2P configuration
    services := make(map[reflect.Type]Service)
    for _, constructor := range n.serviceFuncs {
        // Create a new context for the particular service
        ctx := &ServiceContext{
            config:         n.config,
            services:       make(map[reflect.Type]Service),
            EventMux:       n.eventmux,
            AccountManager: n.accman,
        }
        for kind, s := range services { // copy needed for threaded access
            ctx.services[kind] = s
        }
        // Construct and save the service
        service, err := constructor(ctx)
        if err != nil {
            return err
        }
        kind := reflect.TypeOf(service)
        if _, exists := services[kind]; exists {
            return &DuplicateServiceError{Kind: kind}
        }
        services[kind] = service
    }
    // Gather the protocols and start the freshly assembled P2P server
    for _, service := range services {
        running.Protocols = append(running.Protocols, service.Protocols()...)
    }
    if err := running.Start(); err != nil {
        return convertFileLockError(err)
    }
    // Start each of the services
    started := []reflect.Type{}
    for kind, service := range services {
        // Start the next service, stopping all previous upon failure
        if err := service.Start(running); err != nil {
            for _, kind := range started {
                services[kind].Stop()
            }
            running.Stop()

            return err
        }
        // Mark the service started for potential cleanup
        started = append(started, kind)
    }
    // Lastly start the configured RPC interfaces
    if err := n.startRPC(services); err != nil {
        for _, service := range services {
            service.Stop()
        }
        running.Stop()
        return err
    }
    // Finish initializing the startup
    n.services = services
    n.server = running
    n.stop = make(chan struct{})

    return nil
}
    原文作者:古则
    原文地址: https://www.jianshu.com/p/f04fd9095e74
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞